2015-05-21 28 views
2

我是Spark Streaming的新手。我需要使用动态数据集中的数据来丰富来自流的事件。我在创建动态数据集时遇到了问题。这个数据集应该被来自不同流的数据摄取(但是这个数据流将比主流事件的吞吐量低得多)。另外数据集的大小约为1-3GB,所以使用简单的HashMap将不够用​​(在我看来)。加入DStream动态数据集

火花流编程指南,我发现:

val dataset: RDD[String, String] = ... 
val windowedStream = stream.window(Seconds(20))... 
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) } 

和解释:“其实,你也可以动态地改变你想加入对数据集。”这部分我完全不理解 - RDD如何动态更改?它不是一成不变的吗?

下面你可以看到我的代码。关键是将myStream中的每个新RDD添加到myDataset中,但显然这并不适用于我希望这样工作的方式。

val ssc = new StreamingContext(conf, Seconds(5)) 
val myDataset: RDD[String] = ssc.sparkContext.emptyRDD[String] 
val myStream = ssc.socketTextStream("localhost", 9997) 
lines7.foreachRDD(rdd => {myDataset.union(rdd)}) 
myDataset.foreach(println) 

我将不胜感激任何帮助或建议。 关心!

回答

1

是的,RDD是不可变的。你的代码有一个问题,union()返回一个新的RDD,它不会改变现有的RDD。

的编程指南说以下内容:

其实,你也可以动态地改变你想加入 对数据集。提供给transform功能被评估每批次 间隔,因此将使用当前数据集dataset 参考点。

第一句如下可能读更好:

其实,你也可以动态地改变你想加入 对数据集。

因此,我们可以更改dataset引用的RDD,但不能更改RDD本身。这里有一个如何这可能是工作(使用Python)的例子:

# Run as follows: 
# $ spark-submit ./match_ips_streaming_simple.py.py 2> err 
# In another window run: 
# $ nc -lk 9999 
# Then enter IP addresses separated by spaces into the nc window 
from pyspark import SparkContext 
from pyspark.streaming import StreamingContext 
import time 

BATCH_INTERVAL = 2 
SLEEP_INTERVAL = 8 
sc  = SparkContext("local[*]", "IP-Matcher") 
ssc  = StreamingContext(sc, BATCH_INTERVAL) 
ips_rdd = sc.parallelize(set()) 
lines_ds = ssc.socketTextStream("localhost", 9999) 
# split each line into IPs 
ips_ds = lines_ds.flatMap(lambda line: line.split(" ")) 
pairs_ds = ips_ds.map(lambda ip: (ip, 1)) 
# join with the IPs RDD 
matches_ds = pairs_ds.transform(lambda rdd: rdd.join(ips_rdd)) 
matches_ds.pprint() 
ssc.start() 

# alternate between two sets of IP addresses for the RDD 
IP_FILES = ('ip_file1.txt', 'ip_file2.txt') 
file_index = 0 
while True: 
     with open(IP_FILES[file_index]) as f: 
       ips = f.read().splitlines() 
     ips_rdd = sc.parallelize(ips).map(lambda ip: (ip, 1)) 
     print "using", IP_FILES[file_index] 
     file_index = (file_index + 1) % len(IP_FILES) 
     time.sleep(SLEEP_INTERVAL) 
#ssc.awaitTermination() 

while循环,我改变RDD是ips_rdd引用每8秒。该join()改造将使用任何RDD是ips_rdd目前引用。

$ cat ip_file1.txt 
1.2.3.4 
10.20.30.40 
$ cat ip_file2.txt 
5.6.7.8 
50.60.70.80 

$ spark-submit ./match_ips_streaming_simple.py 2> err 
using ip_file1.txt 
------------------------------------------- 
Time: 2015-09-09 17:18:20 
------------------------------------------- 

------------------------------------------- 
Time: 2015-09-09 17:18:22 
------------------------------------------- 

------------------------------------------- 
Time: 2015-09-09 17:18:24 
------------------------------------------- 
('1.2.3.4', (1, 1)) 
('10.20.30.40', (1, 1)) 

using ip_file2.txt 
------------------------------------------- 
Time: 2015-09-09 17:18:26 
------------------------------------------- 

------------------------------------------- 
Time: 2015-09-09 17:18:28 
------------------------------------------- 
('50.60.70.80', (1, 1)) 
('5.6.7.8', (1, 1)) 
... 

虽然上面的作业运行,在另一个窗口:

$ nc -lk 9999 
1.2.3.4 50.60.70.80 10.20.30.40 5.6.7.8 
<... wait for the other RDD to load ...> 
1.2.3.4 50.60.70.80 10.20.30.40 5.6.7.8