2017-03-17 38 views
1

我正尝试使用spark文档here生成简单的ALS模型。 我的第一个文件(ratings.csv)有2000万用户ID,MovID,大鼠,可以下载here使用spark生成大数据集的推荐模型

所以我有测试数据,这是收视率.csv的一个子集。该测试数据集可以下载here: 测试文件只有UserID,Movie ID列。

因此,要创建培训数据,我们必须过滤ratings.csv。 以下代码对于100,000个用户ID MovID评级的较小案例正常工作。我无法为大案子生成模型。 请帮助指针。

/** 
    * Created by echoesofconc on 3/8/17. 
    */ 
import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.rdd.RDD 
import org.apache.spark.mllib.recommendation.ALS 
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel 
import org.apache.spark.mllib.recommendation.Rating 
import java.io._ 
import scala.collection.mutable.ListBuffer 


object Prateek_Agrawal_task1 { 

    def dropheader(data: RDD[String]): RDD[String] = { 
    data.mapPartitionsWithIndex((idx, lines) => { 
     if (idx == 0) { 
     lines.drop(1) 
     } 
     lines 
    }) 
    } 

    def create_training(ratings_split: RDD[Array[String]], ratings_testing: Array[Array[String]]) = { 
    ratings_split.filter(x => { 
     ratings_testing.exists(y => 
     (x(0) == y(0) && x(1) == y(1)) 
    ) == false 
    }) 
    } 
    def create_testing(ratings_split: RDD[Array[String]], ratings_testing: Array[Array[String]]) = { 
    ratings_split.filter(x => { 
     ratings_testing.exists(y => 
     (x(0) == y(0) && x(1) == y(1)) 
    ) == true 
    }) 
    } 
    def create_model(ratings_train:RDD[Array[String]],rank:Int,numIterations:Int):org.apache.spark.mllib.recommendation.MatrixFactorizationModel={ 
    val ratings = ratings_train.map(_ match { case Array(user,item,rate,temp) => 
     Rating(user.toInt, item.toInt, rate.toDouble) 
    }) 
    val model = ALS.train(ratings, rank, numIterations, 0.01) 
    return model 
    } 

    def print_results(final_predictions_adjusted:RDD[((Int, Int), Double)])={ 
    val rating_range=final_predictions_adjusted.map(x=>(x._2.toInt,1)).reduceByKey(_+_).sortByKey() 
    val rating_range_till_4=rating_range.map{x=> 
     var temp=x 
     if (x._1==5){temp=(4,x._2)} 
     temp 
    }.reduceByKey(_+_) 
    rating_range_till_4.sortByKey().foreach { x => 
     if(x._1==0) 
     printf(">=0 and <1: " + x._2+"\n") 
     if(x._1==1) 
     printf(">=1 and <2: " + x._2+"\n") 
     if(x._1==2) 
     printf(">=2 and <3: " + x._2+"\n") 
     if(x._1==3) 
     printf(">=3 and <4: " + x._2+"\n") 
     if(x._1==4) 
     printf(">=4 " + x._2+"\n") 
     if(x._1==5) 
     printf("=5 " + x._2+"\n") 
    } 
    } 
    case class User_mov_rat(UserID: Int, MovieID:Int, Pred_rating: Double) 

    def print_outputfile(final_predictions_adjusted:RDD[((Int, Int), Double)])={ 
    val writer = new FileWriter(new File("./output.txt")) 
    writer.write("UserID,MovieID,Pred_rating\n") 
    final_predictions_adjusted.collect().foreach(x=>{writer.write(x._1._1+","+x._1._2+","+x._2+"\n")}) 
    writer.close() 
    } 

    def main(args: Array[String]): Unit = { 
    val conf = new SparkConf().setAppName("Prateek_Agrawal_task1").setMaster("local[2]") 
    val sc = new SparkContext(conf) 


    val file = "/Users/echoesofconc/Documents/USC_courses/INF553/ml-20m/ratings.csv" 
    val test = "/Users/echoesofconc/Documents/USC_courses/INF553/Prateek_Agrawal_hw3/testing_20m.csv" 

    val data = sc.textFile(file, 2).cache() 
    val data_test = sc.textFile(test, 2).cache() 

// Drop Header 
    val data_wo_header=dropheader(data).persist() 
    val data_test_wo_header=dropheader(data_test).persist() 
// Create Training and testing data of the format (User ID, MovID, Rating, Time) 

    val ratings_split = data_wo_header.map(line => line.split(",")).persist() 
    data_wo_header.unpersist() 
    data.unpersist() 
    val ratings_testing = data_test_wo_header.map(line => line.split(",")).collect() 
    data_test_wo_header.unpersist() 
    data_test.unpersist() 

    val ratings_train = create_training(ratings_split, ratings_testing).persist() 
    val ratings_test=create_testing(ratings_split, ratings_testing) 
    ratings_split.unpersist() 
    ratings_test.unpersist() 

// Create the model using rating_train the training data 
    val rank = 1 
    val numIterations = 10 

    val model=create_model(ratings_train,rank,numIterations) 
    ratings_train.unpersist() 

// Average user,Rating from training this is for cases which are there in test but not rated by any user in training 
    val user_avgrat=ratings_test.map(_ match { case Array(user, mov, rate, temp) =>(user.toInt, (rate.toDouble,1.0))}).reduceByKey((x,y)=>(x._1 + y._1, x._2 + y._2)).mapValues{ case (sum, count) => (1.0 * sum)/count } 

// Predict user_mov ratings 
    val user_mov = data_test_wo_header.map(_.split(',') match { case Array(user, mov) => 
     (user.toInt,mov.toInt) 
    }) 

    val predictions = 
    model.predict(user_mov).map { case Rating(user, mov, rate) => 
     ((user, mov), rate) 
    } 
// Combine Predictions and unpredicted user,Movies due to them being individual. Going forward we need to improve the accuracy for these predictions 
    val user_mov_rat=user_mov.map(x=>(x,0.0)) 
    val predictions_unpredicted_combined= predictions.union(user_mov_rat).reduceByKey(_+_).map(x=>(x._1._1,(x._1._2,x._2))) 
// Combine average rating and predictions+unpredicted values 
    val avg_rating_predictions_unpredicted_combined=predictions_unpredicted_combined.join(user_avgrat) 
// Generate final predictions RDD 
    val final_predictions=avg_rating_predictions_unpredicted_combined.map{x=> 
     var temp=((x._1,x._2._1._1),x._2._2) 
     if(x._2._1._2==0.0){temp=((x._1,x._2._1._1),x._2._2)} 
     if(x._2._1._2!=0.0){temp=((x._1,x._2._1._1),x._2._1._2)} 
     temp 
    } 
// Adjust for ratings above 5.0 and below 0.0 
    val final_predictions_adjusted=final_predictions.map{x=> 
     var temp=x 
     if (x._2>5.0){temp=(x._1,5.0)} 
     if (x._2<0.0){temp=(x._1,0.0)} 
     temp 
    } 
    val ratesAndPreds = ratings_test.map(_ match { case Array(user, mov, rate, temp) => ((user.toInt,mov.toInt),rate.toDouble)}).join(final_predictions_adjusted) 

    val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) => 
     val err = (r1 - r2) 
     err * err 
    }.mean() 
    val RMSE=math.sqrt(MSE) 
// Print output.txt 
    print_outputfile(final_predictions_adjusted) 
// Print the predictionresults 
    print_results(final_predictions_adjusted.sortByKey()) 
    print(RMSE+"\n") 
    } 
} 

如果有人认为我应该做正则表达式匹配,我已经尝试过这种方法。这似乎不是瓶颈。

我只需要完成创建模型部分,我坚持为大数据集。有人可以帮忙吗?

编辑: 我尝试过的另一种方法是使用广播变量更快。但它已经运行了12个小时,没有任何进展迹象。在火花UI上,不知何故,整个RDD(ratings.csv〜500MB)都没有被缓存。最初只处理大约64MB的250万行数据。我正在使用--executor-memory -8g。我修改了create_training create_testing的功能:

/** 
    * Created by echoesofconc on 3/8/17. 
    */ 
import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.rdd.RDD 
import org.apache.spark.mllib.recommendation.ALS 
import org.apache.spark.mllib.recommendation.Rating 
import java.io._ 



object Prateek_Agrawal_task2 { 

    def dropheader(data: RDD[String]): RDD[String] = { 
    data.mapPartitionsWithIndex((idx, lines) => { 
     if (idx == 0) { 
     lines.drop(1) 
     } 
     lines 
    }) 
    } 
    def create_training(data_wo_header: RDD[String], data_test_wo_header: RDD[String],sc:SparkContext): RDD[String] = { 

    val rdd2array = sc.broadcast(data_test_wo_header.collect()) 
    val training_set = data_wo_header.filter{ 
     case(x) => rdd2array.value.filter(y => x.indexOf(y.toString())==0).length == 0 
    } 
    return training_set 
    } 
    def create_test(data_wo_header: RDD[String], data_test_wo_header: RDD[String],sc:SparkContext): RDD[String] = { 

    val rdd2array = sc.broadcast(data_test_wo_header.collect()) 
    val training_set = data_wo_header.filter{ 
     case(x) => rdd2array.value.filter(y => x.indexOf(y.toString())==0).length != 0 
    } 
    return training_set 
    } 

    def create_model(ratings_train:RDD[String],rank:Int,numIterations:Int):org.apache.spark.mllib.recommendation.MatrixFactorizationModel={ 
    val ratings = ratings_train.map(_.split(',') match { case Array(user, item, rate, timestamp) => 
     Rating(user.toInt, item.toInt, rate.toDouble) 
    }) 
    val model = ALS.train(ratings, rank, numIterations, 0.01) 
    return model 
    } 

    def print_results(final_predictions_adjusted:RDD[((Int, Int), Double)])={ 
    val rating_range=final_predictions_adjusted.map(x=>(x._2.toInt,1)).reduceByKey(_+_).sortByKey() 
    val rating_range_till_4=rating_range.map{x=> 
     var temp=x 
     if (x._1==5){temp=(4,x._2)} 
     temp 
    }.reduceByKey(_+_) 
    rating_range_till_4.sortByKey().foreach { x => 
     if(x._1==0) 
     printf(">=0 and <1: " + x._2+"\n") 
     if(x._1==1) 
     printf(">=1 and <2: " + x._2+"\n") 
     if(x._1==2) 
     printf(">=2 and <3: " + x._2+"\n") 
     if(x._1==3) 
     printf(">=3 and <4: " + x._2+"\n") 
     if(x._1==4) 
     printf(">=4 " + x._2+"\n") 
     if(x._1==5) 
     printf("=5 " + x._2+"\n") 
    } 
    } 
    case class User_mov_rat(UserID: Int, MovieID:Int, Pred_rating: Double) 

    def print_outputfile(final_predictions_adjusted:RDD[((Int, Int), Double)])={ 
    val writer = new FileWriter(new File("./output.txt")) 
    writer.write("UserID,MovieID,Pred_rating\n") 
    final_predictions_adjusted.collect().foreach(x=>{writer.write(x._1._1+","+x._1._2+","+x._2+"\n")}) 
    writer.close() 
    } 

    def main(args: Array[String]): Unit = { 
    val conf = new SparkConf().setAppName("Prateek_Agrawal_task1").setMaster("local[2]") 
    val sc = new SparkContext(conf) 

    val file = "/Users/echoesofconc/Documents/USC_courses/INF553/ml-latest-small/ratings.csv" 
    val test = "/Users/echoesofconc/Documents/USC_courses/INF553/Prateek_Agrawal_hw3/testing_small.csv" 

// val file = "/Users/echoesofconc/Documents/USC_courses/INF553/ml-20m/ratings.csv" 
// val test = "/Users/echoesofconc/Documents/USC_courses/INF553/Prateek_Agrawal_hw3/testing_20m.csv" 

    val data = sc.textFile(file, 2).persist() 
    val data_test = sc.textFile(test, 2).persist() 

    // Drop Header 
    val data_wo_header=dropheader(data) 
    val data_test_wo_header=dropheader(data_test) 
    // Create Traing and testing data of the format (User ID, MovID, Rating, Time) 

    val ratings_train=create_training(data_wo_header,data_test_wo_header,sc).persist() 
    val ratings_test=create_test(data_wo_header,data_test_wo_header,sc) 
// val ratings_test=create_test(data_wo_header,data_test_wo_header,sc) 


// data_test_wo_header.unpersist() 
// data_test.unpersist() 
//// data.unpersist() 
//// data_test.unpersist() 
    // Create the model using rating_train the training data 
    val rank = 1 
    val numIterations = 10 
    val model=create_model(ratings_train,rank,numIterations) 




    // ratings_train.unpersist() 
    // model.save(sc, "target/tmp/myCollaborativeFilter") 
    // val Model = MatrixFactorizationModel.load(sc, "/Users/echoesofconc/myCollaborativeFilter") 

    // Average user,Rating from training 
    val user_avgrat=ratings_test.map(_.split(",") match { case Array(user, mov, rate, temp) =>(user.toInt, (rate.toDouble,1.0))}).reduceByKey((x,y)=>(x._1 + y._1, x._2 + y._2)).mapValues{ case (sum, count) => (1.0 * sum)/count } 
    //data 
    // Predict user_mov ratings 
    val user_mov = data_test_wo_header.map(_.split(',') match { case Array(user, mov) => 
     (user.toInt,mov.toInt) 
    }) 

    val predictions = 
     model.predict(user_mov).map { case Rating(user, mov, rate) => 
     ((user, mov), rate) 
     } 
    // Combine Predictions and unpredicted user,Movies due to them being individual. Going forward we need to improve the accuracy for these predictions 
    val user_mov_rat=user_mov.map(x=>(x,0.0)) 
    val predictions_unpredicted_combined= predictions.union(user_mov_rat).reduceByKey(_+_).map(x=>(x._1._1,(x._1._2,x._2))) 
    // Combine average rating and predictions+unpredicted values 
    val avg_rating_predictions_unpredicted_combined=predictions_unpredicted_combined.join(user_avgrat) 
    // Generate final predictions RDD 
    val final_predictions=avg_rating_predictions_unpredicted_combined.map{x=> 
     var temp=((x._1,x._2._1._1),x._2._2) 
     if(x._2._1._2==0.0){temp=((x._1,x._2._1._1),x._2._2)} 
     if(x._2._1._2!=0.0){temp=((x._1,x._2._1._1),x._2._1._2)} 
     temp 
    } 
    // Adjust for ratings above 5.0 and below 0.0 
    val final_predictions_adjusted=final_predictions.map{x=> 
     var temp=x 
     if (x._2>5.0){temp=(x._1,5.0)} 
     if (x._2<0.0){temp=(x._1,0.0)} 
     temp 
    } 
    val ratesAndPreds = ratings_test.map(_.split(",") match { case Array(user, mov, rate, temp) => ((user.toInt,mov.toInt),rate.toDouble)}).join(final_predictions_adjusted) 

    val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) => 
     val err = (r1 - r2) 
     err * err 
    }.mean() 
    val RMSE=math.sqrt(MSE) 
    // Print output.txt 
    print_outputfile(final_predictions_adjusted) 
    // Print the predictionresults 
    print_results(final_predictions_adjusted.sortByKey()) 
    print(RMSE+"\n") 
    } 
} 
+0

这是完整的代码?看起来很有趣,我会下载数据集并尝试一下。你在群集上还是在本地机器上运行?当您运行大数据集时会发生什么?你是否收到任何内存异常? – Tawkir

+0

顺便说一下,ratings.csv文件的链接不起作用 – Tawkir

+0

我认为你使用这个数据集:https://grouplens.org/datasets/movielens/对吧? – Tawkir

回答

0

这样做没问题。它使用join来创建testng训练数据

/** 
    * Created by echoesofconc on 3/8/17. 
    */ 
import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.rdd.RDD 
import org.apache.spark.mllib.recommendation.ALS 
import org.apache.spark.mllib.recommendation.Rating 
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel 
import java.io._ 



object Prateek_Agrawal_task1 { 

    def dropheader(data: RDD[String]): RDD[String] = { 
    data.mapPartitionsWithIndex((idx, lines) => { 
     if (idx == 0) { 
     lines.drop(1) 
     } 
     lines 
    }) 
    } 

    def create_training(ratings_split: RDD[Array[String]], ratings_testing: Array[Array[String]]) = { 
    ratings_split.filter(x => { 
     ratings_testing.exists(y => 
     (x(0) == y(0) && x(1) == y(1)) 
    ) == false 
    }) 
    } 
    def create_testing(ratings_split: RDD[Array[String]], ratings_testing: Array[Array[String]]) = { 
    ratings_split.filter(x => { 
     ratings_testing.exists(y => 
     (x(0) == y(0) && x(1) == y(1)) 
    ) == true 
    }) 
    } 
    def create_model(ratings_train:RDD[((String, String), (String, String))],rank:Int,numIterations:Int):org.apache.spark.mllib.recommendation.MatrixFactorizationModel={ 
    val ratings = ratings_train.map(_ match { case ((user,item),(rate,temp)) => 
     Rating(user.toInt, item.toInt, rate.toDouble) 
    }) 
    val model = ALS.train(ratings, rank, numIterations, 0.01) 
    return model 
    } 

    def print_results(final_predictions_adjusted:RDD[((Int, Int), Double)])={ 
    val rating_range=final_predictions_adjusted.map(x=>(x._2.toInt,1)).reduceByKey(_+_).sortByKey() 
    val rating_range_till_4=rating_range.map{x=> 
     var temp=x 
     if (x._1==5){temp=(4,x._2)} 
     temp 
    }.reduceByKey(_+_) 
    rating_range_till_4.sortByKey().foreach { x => 
     if(x._1==0) 
     printf(">=0 and <1: " + x._2+"\n") 
     if(x._1==1) 
     printf(">=1 and <2: " + x._2+"\n") 
     if(x._1==2) 
     printf(">=2 and <3: " + x._2+"\n") 
     if(x._1==3) 
     printf(">=3 and <4: " + x._2+"\n") 
     if(x._1==4) 
     printf(">=4 " + x._2+"\n") 
     if(x._1==5) 
     printf("=5 " + x._2+"\n") 
    } 
    } 
    case class User_mov_rat(UserID: Int, MovieID:Int, Pred_rating: Double) 

    def print_outputfile(final_predictions_adjusted:RDD[((Int, Int), Double)])={ 
    val writer = new FileWriter(new File("./output.txt")) 
    writer.write("UserID,MovieID,Pred_rating\n") 
    final_predictions_adjusted.collect().foreach(x=>{writer.write(x._1._1+","+x._1._2+","+x._2+"\n")}) 
    writer.close() 
    } 

    def main(args: Array[String]): Unit = { 
    val conf = new SparkConf().setAppName("Prateek_Agrawal_task1").setMaster("local[2]") 
    val sc = new SparkContext(conf) 

// val file = "/Users/echoesofconc/Documents/USC_courses/INF553/ml-latest-small/ratings.csv" 
// val test = "/Users/echoesofconc/Documents/USC_courses/INF553/Prateek_Agrawal_hw3/testing_small.csv" 

    val file = "/Users/echoesofconc/Documents/USC_courses/INF553/ml-20m/ratings.csv" 
    val test = "/Users/echoesofconc/Documents/USC_courses/INF553/Prateek_Agrawal_hw3/testing_20m.csv" 

    val data = sc.textFile(file, 2).cache() 
    val data_test = sc.textFile(test, 2).cache() 

// Drop Header 
// val data_wo_header=dropheader(data).persist() 
// val data_test_wo_header=dropheader(data_test).persist() 
// Create Traing and testing data of the format (User ID, MovID, Rating, Time) 

    val data_wo_header=dropheader(data).map(_.split(",")).map(x=>((x(0),x(1)),(x(2),x(3)))) 
    val data_test_wo_header=dropheader(data_test).map(_.split(",")).map(x=>((x(0),x(1)),1)) 
    val ratings_train=data_wo_header.subtractByKey(data_test_wo_header) 
    val ratings_test=data_wo_header.subtractByKey(ratings_train) 

    data_test_wo_header.unpersist() 
    data_wo_header.unpersist() 
    data.unpersist() 
    data_test.unpersist() 

// val ratings_split = data_wo_header.map(line => line.split(",")).persist() 
// data_wo_header.unpersist() 
// data.unpersist() 
// val ratings_testing = data_test_wo_header.map(line => line.split(",")).collect() 
// data_test_wo_header.unpersist() 
// data_test.unpersist() 
// 
// val ratings_train = create_training(ratings_split, ratings_testing).persist() 
// val ratings_test=create_testing(ratings_split, ratings_testing) 
// ratings_split.unpersist() 
// ratings_test.unpersist() 

// Create the model using rating_train the training data 
    val rank = 1 
    val numIterations = 10 

// val model=create_model(ratings_train,rank,numIterations) 
// 
// model.save(sc, "/Users/echoesofconc/Documents/USC_courses/INF553/Prateek_Agrawal_hw3/myCollaborativeFilter") 
    val model = MatrixFactorizationModel.load(sc, "/Users/echoesofconc/Documents/USC_courses/INF553/Prateek_Agrawal_hw3/myCollaborativeFilter") 

// Average user,Rating from training 
    val user_avgrat=ratings_train.map(_ match { case ((user, mov), (rate, temp)) =>(user.toInt, (rate.toDouble,1.0))}).reduceByKey((x,y)=>(x._1 + y._1, x._2 + y._2)).mapValues{ case (sum, count) => (1.0 * sum)/count } 
    ratings_train.unpersist() 
// Predict user_mov ratings 
    val user_mov = data_test_wo_header.map(_ match { case ((user, mov),temp) => 
     (user.toInt,mov.toInt) 
    }) 

    val predictions = 
    model.predict(user_mov).map { case Rating(user, mov, rate) => 
     ((user, mov), rate) 
    } 

// Combine Predictions and unpredicted user,Movies due to them being individual. Going forward we need to improve the accuracy for these predictions 
    val user_mov_rat=user_mov.map(x=>(x,0.0)) 
    val predictions_unpredicted_combined= predictions.union(user_mov_rat).reduceByKey(_+_).map(x=>(x._1._1,(x._1._2,x._2))) 
// Combine average rating and predictions+unpredicted values 
    val avg_rating_predictions_unpredicted_combined=predictions_unpredicted_combined.join(user_avgrat) 
// Generate final predictions RDD 
    val final_predictions=avg_rating_predictions_unpredicted_combined.map{x=> 
     var temp=((x._1,x._2._1._1),x._2._2) 
     if(x._2._1._2==0.0){temp=((x._1,x._2._1._1),x._2._2)} 
     if(x._2._1._2!=0.0){temp=((x._1,x._2._1._1),x._2._1._2)} 
     temp 
    } 
// Adjust for ratings above 5.0 and below 0.0 
    val final_predictions_adjusted=final_predictions.map{x=> 
     var temp=x 
     if (x._2>5.0){temp=(x._1,5.0)} 
     if (x._2<0.0){temp=(x._1,0.0)} 
     temp 
    } 
// final_predictions_adjusted.count() 
    val ratesAndPreds_map = ratings_test.map(_ match { case ((user, mov), (rate, temp)) => ((user.toInt,mov.toInt),rate.toDouble)}) 
    val ratesAndPreds=ratesAndPreds_map.join(final_predictions_adjusted) 
    val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) => 
     val err = (r1 - r2) 
     err * err 
    }.mean() 
    val RMSE=math.sqrt(MSE) 
// Print output.txt 
    print_outputfile(final_predictions_adjusted) 
// Print the predictionresults 
    print_results(final_predictions_adjusted.sortByKey()) 
    print(RMSE+"\n") 
    } 
}