2017-02-11 42 views
1

基本上,我使用单个Spark Streaming消费者[直接方法]从多个kafka主题使用数据。如何将RDT数量的DStream转换为单个RDD

val dStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet).map(_._2) 

批处理间隔是30 Seconds

我在这里得到了几个问题。

  1. 当我在DStream上调用foreachRDD时,DStream是否包含多个RDD而不是单个RDD?每个主题都会创建单独的RDD?
  2. 如果是,我想将所有的RDD联合到单个RDD,然后处理数据。我怎么做?
  3. 如果我的处理时间超过批处理间隔,DStream是否会包含多个RDD?

我试图联合DStream RDDs到单个RDD使用下面的方式。首先是我的理解正确吗?如果DStream总是返回单个RDD,则下面的代码不是必需的。

示例代码:

var dStreamRDDList = new ListBuffer[RDD[String]] 
dStream.foreachRDD(rdd => 
     { 
      dStreamRDDList += rdd 
     }) 
val joinedRDD = ssc.sparkContext.union(dStreamRDDList).cache() 

//THEN PROCESS USING joinedRDD 
//Convert joinedRDD to DF, then apply aggregate operations using DF API. 

回答

1

请问DSTREAM包含多个RDD的,而不是单RDD当我打电话foreachRDD上DSTREAM?每个主题将创建单独的RDD?

不会。即使您有多个主题,在任何给定的批处理间隔内都会有一个RDD。

如果我的处理时间超过批处理间隔,DStream是否会包含多个RDD?

不,如果您的处理时间比批处理间隔长,那么所有将要完成的工作是读取主题偏移量。下一个批次的处理只有在前一个作业完成后才会开始。

作为一个方面说明,确保你真正需要使用foreachRDD,或者如果可能you're misusing the DStream API(免责声明:我是帖子的作者)

+0

谢谢,我会读您的文章和回来...: ) – Shankar

相关问题