2017-07-16 114 views
0

我在星火这个数据结构:星火复杂的分组

val df = Seq(
("Package 1", Seq("address1", "address2", "address3")), 
("Package 2", Seq("address3", "address4", "address5", "address6")), 
("Package 3", Seq("address7", "address8")), 
("Package 4", Seq("address9")), 
("Package 5", Seq("address9", "address1")), 
("Package 6", Seq("address10")), 
("Package 7", Seq("address8"))).toDF("Package", "Destinations") 
df.show(20, false) 

我需要找到被视为一起在不同的软件包的所有地址。看起来我无法找到有效实现这一点的方法。我试着组,地图等理想的情况下,给定的df的结果将是

+----+------------------------------------------------------------------------+ 
| Id |        Addresses        | 
+----+------------------------------------------------------------------------+ 
| 1 | [address1, address2, address3, address4, address5, address6, address9] | 
| 2 | [address7, address8]             | 
| 3 | [address10]               | 
+----+------------------------------------------------------------------------+ 

回答

2

考虑使用TreeReducehttps://spark.apache.org/docs/2.1.0/api/java/org/apache/spark/rdd/RDD.html#treeReduce(scala.Function2,%20int)

  • 对于创建的一套外置的sequential操作:

    • 对于每个新的元件阵列例如[address 7address 8] - 迭代通过现有组,以检查是否相交是非空的:如果是的话再加入这些元素到设置

      • 否则创建包含这些元素
    • 一组新的对于combine操作:

      • 对于每一个在合并操作的左侧集: - 遍历所有在右侧设置找到任何非空交点 - 如果找到任何非空的intectction然后组合这两个集合。

注意TreeReduce是较新的命名。 TreeAggregate用于较旧版本的Spark

+0

我知道我问的太多了,但请问您可否包含一个小例子? – twoface88

+0

TreeReduce没有'sequential'或'combine','TreeAggregate'确实 – twoface88