2016-04-08 64 views
3

我们可以在两个DStream之间共享Spark Spark状态吗?两个流之间的Spark Streaming共享状态

基本上我想创建/更新状态使用第一个流和丰富第二个流使用状态。

例如:我修改了StatefulNetworkWordCount示例。我正在使用第一个流来创建状态,并用第一个流的数量来丰富第二个流。

val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1))) 


val mappingFuncForFirstStream = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => { 
    val sum = one.getOrElse(0) + state.getOption.getOrElse(0) 
    val output = (word, sum) 
    state.update(sum) 

    Some(output) 
} 

val mappingFuncForSecondStream = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => { 
    val sum = state.getOption.getOrElse(0) 
    val output = (word, sum) 

    Some(output) 
} 



// first stream 
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) 
    .flatMap(r=>r._2.split(" ")) 
    .map(x => (x, 1)) 
    .mapWithState(StateSpec.function(mappingFuncForFirstStream).initialState(initialRDD).timeout(Minutes(10))) 
    .print(1) 



// second stream 
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams2, mergeTopicSet) 
    .flatMap(r=>r._2.split(" ")) 
    .map(x => (x, 1)) 
    .mapWithState(StateSpec.function(mappingFuncForSecondStream).initialState(initialRDD).timeout(Minutes(10))) 
    .print(50) 

在检查点目录中,我可以看到两个不同的状态RDD。

我使用的火花1.6.1和卡夫卡0.8.2.1

回答

-1

这种方法可能对你会有所帮助:

ssc.untion(Seq[Dstream[T]]) 
+0

感谢@ bug_xshguo,我不认为这是一个好主意。我想对这些流执行不同的转换。 – banjara

1

这是可以访问的底层StateDStream应用mapWithState操作所产生的DStream使用stateMappedDStream.snapshotStream()

因此,启发你的例子:

val firstDStream = ??? 
val secondDStream = ??? 
val firstDStreamSMapped = firstDStream..mapWithState(...) 
val firstStreamState = firstDStreamSMapped.snapshotStream() 
// we want to use the state of Stream 1 to enrich Stream 2. The keys of both streams are required to match. 
val enrichedStream = secondDStream.join(firstStreamState) 
... do stuff with enrichedStream ... 
+0

感谢您的回复,这是一个有趣的方法,它也可以工作,但在我的情况下,状态将是巨大的,我不想遍历每个微批次的整个状态。 – banjara

+0

@shekhar所以你想直接破解一个DStream中的StateDStream到第二个?我想你需要深入了解源代码,看看是否可以完成,以及如何完成。 – maasg