2017-10-20 58 views
0

我在本地运行Spark并且出现了一个奇怪的问题。基本上,我可以使用DataFrame的show()方法输出任意数量的行,但是,当我尝试使用count()或collect()(甚至是很少量的数据)时,Spark就会停留在该阶段。永远不会完成它的工作。我使用gradle来构建和运行。show()/ count()永远不会完成while show()快速运行

当我运行

./gradlew clean run 

程序卡住在

> Building 83% > :run 

什么会导致这个问题? 这是代码。

val moviesRatingsDF = MongoSpark.load(sc).toDF().select("movieId", "userId","rating") 

    val movieRatingsDF = moviesRatingsDF 
     .groupBy("movieId") 
     .pivot("userId") 
     .max("rating") 
     .na.fill(0) 

    val ratingColumns = movieRatingsDF.columns.drop(1) // drop the name column 

    val movieRatingsDS:Dataset[MovieRatingsVector] = movieRatingsDF 
     .select(col("movieId").as("movie_id"), array(ratingColumns.map(x => col(x)): _*).as("ratings")) 
     .as[MovieRatingsVector] 

    val moviePairs = movieRatingsDS.withColumnRenamed("ratings", "ratings1") 
     .withColumnRenamed("movie_id", "movie_id1") 
     .crossJoin(movieRatingsDS.withColumnRenamed("ratings", "ratings2").withColumnRenamed("movie_id", "movie_id2")) 
     .filter(col("movie_id1") < col("movie_id2")) 

    val movieSimilarities = moviePairs.map(row => { 
     val ratings1 = sc.parallelize(row.getAs[Seq[Double]]("ratings1")) 
     val ratings2 = sc.parallelize(row.getAs[Seq[Double]]("ratings2")) 
     val corr:Double = Statistics.corr(ratings1, ratings2) 

     MovieSimilarity(row.getAs[Long]("movie_id1"), row.getAs[Long]("movie_id2"), corr) 
    }).cache() 


    val collectedData = movieSimilarities.collect() 
    println(collectedData.length) 

    log.warn("I'm done") //never gets here 

    close 

enter image description here

+0

我可能在这里是错的,但我认为你不应该在'moviePairs'数据集的转换中创建新的RDD? (我指的是两个'sc.parallelize(。)') –

+0

我也觉得它有点低效。但是Statistics.corr方法需要一对RDD。如果我有办法将2个向量传递给它,会很好。但这种情况并非如此。无论如何,这个任务看起来很好(如果我之后打印一些东西 - 它不会花费很长时间)。 –

回答

-1

星火确实懒惰的评价,并创建RDD/DF的时候一个动作被调用。

要回答你的问题

1。在收集/计数你调用两个不同的动作,柜面如果你不 坚持数据,这将导致RDD/DF进行重新评估,因此 比预期更多的时间。

  1. 在显示中只有一个动作。它只显示前1000行(手指交叉 ),因此它结束