2017-05-15 144 views
0

我有大小6000的查找RDD内RDD访问,lookup_rdd:RDD [字符串]火花:另一个RDD

a1 a2 a3 a4 a5 .....

和另一个RDD,data_rdd:RDD [(字符串,可迭代[(字符串, INT)])]:(ID,(项目,计数)),它具有唯一的ID,在lookup_rdd

(id1,List((a1,2), (a3,4))) (id2,List((a2,1), (a4,2), (a1,1))) (id3,List((a5,1)))

FOREACH元素我要检查每个ID是否有该元素与否,如果它的存在我把计数,如果不是我把0,和存储在一个文件中。

什么是实现这一目标的有效方法。哈希可能吗?例如。输出我想要的是:

id1,2,0,4,0,0 id2,1,1,0,2,0 id3,0,0,0,0,1

我已经试过这样:

val headers = lookup_rdd.zipWithIndex().persist() 
val indexing = data_rdd.map{line => 
    val id = line._1 
    val item_cnt_list = line._2 
    val arr = Array.fill[Byte](6000)(0) 
    item_cnt_list.map(c=>(headers.lookup(c._1),c._2)) 
    } 
indexing.collect().foreach(println) 

我得到异常:

org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations

+0

6000 entires是一个非常小的数据集。考虑收集驱动程序,然后广播 –

回答

1

坏消息是,你不能使用RDD在另一个。

好消息是,对于您的使用情况,假设6000个条目相当小,则存在一个理想的解决方案:在驱动程序上收集RDD,将其广播回集群的每个节点,并在其他RDD,就像你之前做过的那样。

val sc: SparkContext = ??? 
val headers = sc.broadcast(lookup_rdd.zipWithIndex.collect().toMap) 
val indexing = data_rdd.map { case (_, item_cnt_list) => 
    item_cnt_list.map { case (k, v) => (headers.value(k), v) } 
} 
indexing.collect().foreach(println) 
+0

感谢您的答案。有一个类似的情况,但另外..有更新地图功能里面的查找表。对于下一个元素,我必须查找更新的查找表。我明白,我们不能用broadcast来做这件事。请你建议如何解决这个问题。即使链接到资源也会有所帮助。提前致谢。 – Phoenix

+0

我相信你有一个更好的改变,为你的特殊情况创建一个问题,分享相关的代码。没有它,很难说。 – stefanobaghino

+0

已经添加了一个单独的问题:请你看看。 :HTTPS://stackoverflow.com/questions/49125735/loop-through-dataframe-and-update-the-lookup-table-simultaneously-spark-scala – Phoenix