我们可以在两个DStream之间共享Spark Spark状态吗?两个流之间的Spark Streaming共享状态
基本上我想创建/更新状态使用第一个流和丰富第二个流使用状态。
例如:我修改了StatefulNetworkWordCount示例。我正在使用第一个流来创建状态,并用第一个流的数量来丰富第二个流。
val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
val mappingFuncForFirstStream = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
Some(output)
}
val mappingFuncForSecondStream = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => {
val sum = state.getOption.getOrElse(0)
val output = (word, sum)
Some(output)
}
// first stream
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
.flatMap(r=>r._2.split(" "))
.map(x => (x, 1))
.mapWithState(StateSpec.function(mappingFuncForFirstStream).initialState(initialRDD).timeout(Minutes(10)))
.print(1)
// second stream
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams2, mergeTopicSet)
.flatMap(r=>r._2.split(" "))
.map(x => (x, 1))
.mapWithState(StateSpec.function(mappingFuncForSecondStream).initialState(initialRDD).timeout(Minutes(10)))
.print(50)
在检查点目录中,我可以看到两个不同的状态RDD。
我使用的火花1.6.1和卡夫卡0.8.2.1
感谢@ bug_xshguo,我不认为这是一个好主意。我想对这些流执行不同的转换。 – banjara