2
我试图通过将用户的RDD映射到模型的recommendationsProducts方法来从MatrixFactorizationModel中提取预测。这给了我一个MapPartitionsRDD。试图减少或以其他方式访问此RDD会给我一个Spark异常。访问MapPartitionsRDD时Spark Spark RDD转换异常
下面是简化代码:
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import org.apache.spark.mllib.recommendation.{ALS, Rating, MatrixFactorizationModel}
val users = sc.parallelize(List(1,2))
val trainingData = sc.parallelize(List(Rating(1,1,0.5),Rating(1,2,0.5),Rating(2,1,1),Rating(2,3,1))).cache()
val model = ALS.trainImplicit(trainingData, 6, 20, 0.1, 2)
val recommendations = users.map(model.recommendProducts(_,2))
recommendations.first
在最后一行出现的错误:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 11500.0 failed 1 times, most recent failure: Lost task 2.0 in stage 11500.0 (TID 6401, localhost): org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:928)
at org.apache.spark.mllib.recommendation.MatrixFactorizationModel.recommendProducts(MatrixFactorizationModel.scala:168)
我唯一的理论是,在创建时thatMapPartitionRDDs还没有实际应用的功能,因此,如果该模型的recommendationProducts方法执行某种隐式的RDD函数,也许它只在数据被访问时调用这个方法,所以我们得到一个试图嵌套的RDD调用。在这种情况下,这是否意味着不可能在MatrixFactorizationModels上并行执行任何操作?