2016-06-07 73 views
1

我有一个Spark数据流进程,它将kafka, 的数据读入DStream。在Spark Streaming中缓存DStream

在我的管道我做两次(陆续):

DStream.foreachRDD(RDD上的转换和插入到目的地)。

(每次我做不同的处理和插入数据到不同的目的地)。

我想知道DStream.cache在从卡夫卡工作中读取数据后会如何?可以做到吗?

该过程现在是否实际上从卡夫卡读取数据两次?

请记住,这是不可能放两个foreachRDDs成一个(因为两条路径有很大的不同,也有有状态的转变存在 - 这需要对DSTREAM被appliend ...)

谢谢您的帮助

+0

Dstream.cache将工作。它在第一次看到某个动作时缓存该流。对于DStream中的后续操作,它使用缓存。 – Knight71

+0

@ Knight71当DStream不再需要时,我还需要设置DStream.unpersist(true),与缓存RDD时一样? –

+0

Dstream数据将在所有操作完成后自动清除,并且基于转换由火花流确定。 – Knight71

回答

3

这里有两种选择:

  • 使用Dstream.cache()的缓存,以纪念底层RDDS。 Spark Streaming将负责在spark.cleaner.ttl配置控制的超时后暂停RDD。

  • 使用额外foreachRDD申请cache()unpersist(false)影响的操作到RDDS在DSTREAM:

如:

val kafkaDStream = ??? 
val targetRDD = kafkaRDD 
         .transformation(...) 
         .transformation(...) 
         ... 
// Right before the lineage fork mark the RDD as cacheable: 
targetRDD.foreachRDD{rdd => rdd.cache(...)} 
targetRDD.foreachRDD{do stuff 1} 
targetRDD.foreachRDD{do stuff 2} 
targetRDD.foreachRDD{rdd => rdd.unpersist(false)} 

请注意,您可以纳入缓存为第一如果这是一个选项do stuff 1陈述。

我更喜欢这个选项,因为它使我能够对缓存生命周期进行细粒度的控制,并且可以在需要时立即清理内容,而不是依赖于ttl。

+0

'''spark.cleaner.ttl'''被删除。这是什么新的财产控制? – okwap

相关问题