2016-02-29 89 views
0

我正在使用Spark的Streaming API,我只是想更好地了解如何最好地设计代码。Spark Streaming DStream元素vs RDD

我目前使用的卡夫卡消费者(在pyspark)从pyspark.streaming.kafka.createDirectStream

根据http://spark.apache.org/docs/latest/streaming-programming-guide.html

星火流提供了一个名为高级抽象离散 流或DSTREAM ,它代表了连续的数据流。 DStreams可以通过来自输入数据流的输入数据流 (如Kafka,Flume和Kinesis)创建,也可以通过在其他DStream上应用高级别的操作来创建。在内部,DStream被表示为 RDD的序列。

本质上,我想将一组函数应用于DStream中的每个元素。目前,我正在使用pyspark.streaming.DStream的“地图”功能。根据文件,我的方法似乎是正确的。 http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html#pyspark.streaming.DStream

地图(F,preservesPartitioning =假)通过施加 函数DSTREAM的每个元素返回一个新的DSTREAM。

我应该使用map还是正确的方法是将函数/转换应用于RDD(由于DStream使用RDD)?

foreachRDD(func)对此DStream中的每个RDD应用一个函数。

更多文档: http://spark.apache.org/docs/latest/api/python/pyspark.streaming.html

回答

1

DirectStream.map这里是一个正确的选择。继map

stream.map(f) 

相当于:从另一方面

stream.transform(lambda rdd: rdd.map(f)) 

DirectStream.foreachRDD是输出动作,并创建一个输出DStream。与foreachRDD一起使用的函数不会返回任何内容,与方法本身相同。看看Scala签名显然很明显:

def foreachRDD(foreachFunc: RDD[T] => Unit): Unit 
+0

感谢您的反馈!我做了一些研究,并研究了文档。实质上,我想通过映射/变换/过滤器​​功能来完成所有的转换/按摩方面。要插入数据库(即一些“侧面功能”),我将利用.foreachRDD –

+0

除了非常简单的应用程序之外,这不是一个非常有用的方法。任何在'foreachRDD'内部应用的转换在退出其关闭时都会丢失。所以这意味着你只能应用“线性”工作流程 – zero323

+0

除了简单的应用程序之外,什么不是一种有用的方法?你会进一步详细说明,不确定你的意思。将数据插入数据库。 –