2015-03-31 16 views
1

我正在研究prediction.io的模板,我遇到了Spark的麻烦。Spark - Prediction.io - scala.MatchError:null

我不断收到一个错误scala.MatchErrorfull gist here

scala.MatchError: null 
at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:831) 
at org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:66) 
at org.template.prediction.ALSAlgorithm$$anonfun$predict$1$$anonfun$apply$1.apply(ALSAlgorithm.scala:86) 
at org.template.prediction.ALSAlgorithm$$anonfun$predict$1$$anonfun$apply$1.apply(ALSAlgorithm.scala:79) 
at scala.Option.map(Option.scala:145) 
at org.template.prediction.ALSAlgorithm$$anonfun$predict$1.apply(ALSAlgorithm.scala:79) 
at org.template.prediction.ALSAlgorithm$$anonfun$predict$1.apply(ALSAlgorithm.scala:78) 

代码github source here

val usersWithCounts = 
    ratingsRDD 
    .map(r => (r.user, (1, Seq[Rating](Rating(r.user, r.item, r.rating))))) 
    .reduceByKey((v1, v2) => (v1._1 + v2._1, v1._2.union(v2._2))) 
    .filter(_._2._1 >= evalK) 

// create evalK folds of ratings 
(0 until evalK).map { idx => 
    // start by getting this fold's ratings for each user 
    val fold = usersWithCounts 
    .map { userKV => 
     val userRatings = userKV._2._2.zipWithIndex 
     val trainingRatings = userRatings.filter(_._2 % evalK != idx).map(_._1) 
     val testingRatings = userRatings.filter(_._2 % evalK == idx).map(_._1) 
     (trainingRatings, testingRatings) // split the user's ratings into a training set and a testing set 
    } 
    .reduce((l, r) => (l._1.union(r._1), l._2.union(r._2))) // merge all the testing and training sets into a single testing and training set 

    val testingSet = fold._2.map { 
    r => (new Query(r.user, r.item), new ActualResult(r.rating)) 
    } 

    (
    new TrainingData(sc.parallelize(fold._1)), 
    new EmptyEvaluationInfo(), 
    sc.parallelize(testingSet) 
) 

} 

为了做评价,我需要的收视分成训练和测试组。为了确保每个用户都已被纳入培训,我将所有用户的评分分组在一起,然后对每位用户进行分组,然后将分组加入一起。

也许有更好的方法来做到这一点?

回答

1

该错误表示MLlib MatrixFactorizationModel的userFeatures不包含用户标识(例如,如果用户不在训练数据中)。 MLlib在查找之后(使用.head)不检查这个: https://github.com/apache/spark/blob/v1.2.0/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala#L66

要调试是否如此,模型而不是调用默认的:

val itemScore = model.predict(userInt, itemInt) 

https://github.com/nickpoorman/template-scala-parallel-prediction/blob/master/src/main/scala/ALSAlgorithm.scala#L80):

更改使用.headOption:

val itemScore = model.userFeatures.lookup(userInt).headOption.map { userFeature => 
    model.productFeatures.lookup(itemInt).headOption.map { productFeature => 
    val userVector = new DoubleMatrix(userFeature) 
    val productVector = new DoubleMatrix(productFeature) 
    userVector.dot(productVector) 
    }.getOrElse{ 
    logger.info(s"No itemFeature for item ${query.item}.") 
    0.0 // return default score 
    } 
}.getOrElse{ 
    logger.info(s"No userFeature for user ${query.user}.") 
    0.0 // return default score 
}