2016-04-15 59 views
1

我可以执行连接两个星火DStreams像:星火:抗连接两个DStreams

val joinStream = stream1.join(stream2) 

现在,如果我需要过滤掉所有未加入的记录。基本上,就像stream1.anti-join(stream2)。这可能以某种方式吗?

谢谢,感谢任何帮助!

+0

我不知道我明白你什么意思通过反连接 – eliasah

+0

其中有一个共同的核心权利的记录之间的连接发生的呢?我需要来自两个流没有共同的JOIN键的所有记录。 – void

+1

类似于http://2.bp.blogspot.com/-9xB6dMw3mcY/UIGn0glldYI/AAAAAAAAAEo/H8AkcRYvUHk/s1600/sql-left-outer-join-where-table-is-null-or-table-is-null。 png? – eliasah

回答

2

假如你想知道这些:

val rdd1 = sc.parallelize(Array(
    (1, "one"), 
    (2, "twow"), 
    (3, "three"), 
    (4, "four"), 
    (5, "five") 
)) 
val rdd2 = sc.parallelize(Array(
    (1, "otherOne"), 
    (4, "otherFour"), 
    (5,"otherFive"), 
    (6,"six"), 
    (7,"seven") 
)) 

val antiJoined = rdd1.fullOuterJoin(rdd2).filter(r => r._2._1.isEmpty || r._2._2.isEmpty) 

antiJoined.collect foreach println 
(6,(None,Some(six))) 
(2,(Some(twow),None)) 
(3,(Some(three),None)) 
(7,(None,Some(seven)))