1
基本上,我使用单个Spark Streaming消费者[直接方法]从多个kafka主题使用数据。如何将RDT数量的DStream转换为单个RDD
val dStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet).map(_._2)
批处理间隔是30 Seconds
。
我在这里得到了几个问题。
- 当我在DStream上调用foreachRDD时,DStream是否包含多个RDD而不是单个RDD?每个主题都会创建单独的RDD?
- 如果是,我想将所有的RDD联合到单个RDD,然后处理数据。我怎么做?
- 如果我的处理时间超过批处理间隔,DStream是否会包含多个RDD?
我试图联合DStream RDDs到单个RDD使用下面的方式。首先是我的理解正确吗?如果DStream总是返回单个RDD,则下面的代码不是必需的。
示例代码:
var dStreamRDDList = new ListBuffer[RDD[String]]
dStream.foreachRDD(rdd =>
{
dStreamRDDList += rdd
})
val joinedRDD = ssc.sparkContext.union(dStreamRDDList).cache()
//THEN PROCESS USING joinedRDD
//Convert joinedRDD to DF, then apply aggregate operations using DF API.
谢谢,我会读您的文章和回来...: ) – Shankar