2016-02-03 84 views
2

我试图通过将用户的RDD映射到模型的recommendationsProducts方法来从MatrixFactorizationModel中提取预测。这给了我一个MapPartitionsRDD。试图减少或以其他方式访问此RDD会给我一个Spark异常。访问MapPartitionsRDD时Spark Spark RDD转换异常

下面是简化代码:

import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.rdd._ 
import org.apache.spark.mllib.recommendation.{ALS, Rating, MatrixFactorizationModel} 

val users = sc.parallelize(List(1,2)) 
val trainingData = sc.parallelize(List(Rating(1,1,0.5),Rating(1,2,0.5),Rating(2,1,1),Rating(2,3,1))).cache() 

val model = ALS.trainImplicit(trainingData, 6, 20, 0.1, 2) 

val recommendations = users.map(model.recommendProducts(_,2)) 

recommendations.first 

在最后一行出现的错误:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 11500.0 failed 1 times, most recent failure: Lost task 2.0 in stage 11500.0 (TID 6401, localhost): org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. 
at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:928) 
at org.apache.spark.mllib.recommendation.MatrixFactorizationModel.recommendProducts(MatrixFactorizationModel.scala:168) 

我唯一的理论是,在创建时thatMapPartitionRDDs还没有实际应用的功能,因此,如果该模型的recommendationProducts方法执行某种隐式的RDD函数,也许它只在数据被访问时调用这个方法,所以我们得到一个试图嵌套的RDD调用。在这种情况下,这是否意味着不可能在MatrixFactorizationModels上并行执行任何操作?

回答

1

正如我怀疑的,看着MatrixFactorizationModel的来源,我可以看到它在内部将用户和产品功能存储为RDD。因此,任何对此模型的调用都必须由主人完成。为了运行我的代码,为了使用迭代的非RDD版本的地图,我必须将我的用户扁平化:

val recommendations = users.collect.toList.map(model.recommendProducts(_,2)) 

recommendations.head 
相关问题