我已成功集成代码以将消息从事件集线器中取出并通过spark/spark-streaming处理它们。消息通过时,我正在转向管理状态。这是我正在使用的代码,它大部分是https://docs.cloud.databricks.com/docs/spark/1.6/examples/Streaming%20mapWithState.htmlSpark Streaming和Azure事件集线器mapWithState
本质上,它适用于一个虚拟源,它在单个分区上使用单个流但它不适用于工会化窗口流..虽然我可以为每个分区创建一个流的多个实例它有点击败工会和窗口的重点.. +我试图让它工作的方式失败。我有点卡住在何处走了..如果任何人有这将是盛大的任何想法的灵感..
val sparkSession = SparkSession.builder().master("local[2]").config(sparkConfiguration).getOrCreate()
val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(10))
streamingContext.checkpoint(inputOptions.checkpointDir)
//derive the stream and window
val eventHubsStream = EventHubsUtils.createUnionStream(streamingContext, eventHubsParameters)
val eventHubsWindowedStream = eventHubsStream.window(Seconds(10))
val initialRDD = sparkSession.sparkContext.parallelize(List(("dummy", 100L), ("source", 32L)))
val stateSpec = StateSpec.function(trackStateFunc _)
.initialState(initialRDD)
.numPartitions(2)
.timeout(Seconds(60))
val eventStream = eventHubsWindowedStream
.map(messageStr => {
//parse teh event
var event = gson.fromJson(new String(messageStr), classOf[Event])
//return a tuble of key/value pair
(event.product_id.toString, 1)
})
val eventStateStream = eventStream.mapWithState(stateSpec)
val stateSnapshotStream = eventStateStream.stateSnapshots()
stateSnapshotStream.print()
stateSnapshotStream.foreachRDD { rdd =>
import sparkSession.implicits._
rdd.toDF("word", "count").registerTempTable("batch_word_count")
}
streamingContext.remember(Minutes(1))
streamingContext
*它不适用于联合化窗口流*什么不工作? –
道歉,实质上是从未调用状态函数。我无法调试到这一点。当我使用示例代码的罚款..当我使用一个单一的流它的罚款..但不是当我使用unionized流或窗口.. –
你尝试在IDE本地调试吗? –