我是Spark Streaming的新手。我需要使用动态数据集中的数据来丰富来自流的事件。我在创建动态数据集时遇到了问题。这个数据集应该被来自不同流的数据摄取(但是这个数据流将比主流事件的吞吐量低得多)。另外数据集的大小约为1-3GB,所以使用简单的HashMap将不够用(在我看来)。加入DStream动态数据集
火花流编程指南,我发现:
val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
和解释:“其实,你也可以动态地改变你想加入对数据集。”这部分我完全不理解 - RDD如何动态更改?它不是一成不变的吗?
下面你可以看到我的代码。关键是将myStream中的每个新RDD添加到myDataset中,但显然这并不适用于我希望这样工作的方式。
val ssc = new StreamingContext(conf, Seconds(5))
val myDataset: RDD[String] = ssc.sparkContext.emptyRDD[String]
val myStream = ssc.socketTextStream("localhost", 9997)
lines7.foreachRDD(rdd => {myDataset.union(rdd)})
myDataset.foreach(println)
我将不胜感激任何帮助或建议。 关心!