2016-03-09 59 views
0

我花了一段时间才弄明白这一点,并且想分享我的解决方案。改进绝对受欢迎。拼合RDD - 键值对中的嵌套列表

参考文献:Flattening a Scala Map in an RDDSpark Flatten Seq by reversing groupby, (i.e. repeat header for each sequence in it)

我有以下形式的RDD:RDD [(中间体,列表[(字符串,列表[(字符串,整型,浮点型)])])]

密钥的:int

值:列表[(字符串,列表[(字符串,整型,浮点型)])]

随着压扁到的一个目标:RDD [(中等,字符串,字符串,整型,浮点型)]

binHostCountByDate.foreach(println) 

给人的例子:

(516361, List((2013-07-15, List((s2.rf.ru,1,0.5), (s1.rf.ru,1,0.5))), (2013-08-15, List((p.secure.com,1,1.0))))) 

最终RDD应给予以下

(516361,2013-07-15,s2.rf.ru,1,0.5) 
(516361,2013-07-15,s1.rf.ru,1,0.5) 
(516361,2013-08-15,p.secure.com,1,1.0) 

回答

1

这是一个简单的一行(并与解构的换理解,我们可以比_1,_2._1等,这使得它更容易,以确保我们得到正确的结果

// Use a outer list in place of an RDD for test purposes 
val t = List((516361, 
       List(("2013-07-15", List(("s2.rf.ru,",1,0.5), ("s1.rf.ru",1,0.5))), 
         ("2013-08-15", List(("p.secure.com,",1,1.0)))))) 

t flatMap {case (k, xs) => for ((d, ys) <- xs; (dom, a,b) <-ys) yield (k, d, dom, a, b)} 
    //> res0: List[(Int, String, String, Int, Double)] = 
     List((516361,2013-07-15,s2.rf.ru,,1,0.5), 
      (516361,2013-07-15,s1.rf.ru,1,0.5), 
      (516361,2013-08-15,p.secure.com,,1,1.0)) 
更好的名字
0

我的方法如下:

我平坦化第一密钥值对。这“删除”第一个列表。

val binHostCountForDate = binHostCountByDate.flatMapValues(identity) 

给我以下形式的RDD:RDD [(智力,(字符串,列表[(字符串,整数,浮点数)])]

binHostCountForDate.foreach(println) 

(516361,(2013-07-15,List((s2.rf.ru,1,0.5), (s1.rf.ru,1,0.5)))) 
(516361,(2013-08-15,List(p.secure.com,1,1.0)) 

现在,我的前两个项目映射到。一个元组创建一个新的密钥和第二元组作为值然后应用相同的步骤,上述变平关于新的密钥值对

val binDataRemapKey = binHostCountForDate.map(f =>((f._1, f._2._1), f._2._2)).flatMapValues(identity) 

这给出了扁平RDD:RDD [(中间体,字符串) ,(String,Int,Float)]

如果这个表单没有问题,那么我们就完成了,但我们可以更进一步,删除元组以获得我们最初寻找的最终表单。

val binData = binDataRemapKey.map(f => (f._1._1, f._1._2, f._2._1, f._2._2, f._2._3)) 

这给我们的最终形式:RDD [(智力,字符串,字符串,整数,浮点数)

我们现在有一个扁平RDD已保存每个列表的父母。