0
当监控结构化流传输的StreamingQueryListener,我发现复制在onQueryProgress火花onQueryProgress复制
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
if(queryProgress.progress.numInputRows!=0) {
println("Query made progress: " + queryProgress.progress)
}
结果是
Query made progress: {
"id" : "e76a8789-738c-49f6-b7f4-d85356c28600",
"runId" : "d8ce0fad-db38-4566-9198-90169efeb2d8",
"name" : null,
"timestamp" : "2017-08-15T07:28:27.077Z",
"numInputRows" : 1,
"processedRowsPerSecond" : 0.3050640634533252,
"durationMs" : {
"addBatch" : 2452,
"getBatch" : 461,
"queryPlanning" : 276,
"triggerExecution" : 3278
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[test1]]",
"startOffset" : {
"test1" : {
"0" : 19
}
},
"endOffset" : {
"test1" : {
"0" : 20
}
},
"numInputRows" : 1,
"processedRowsPerSecond" : 0.3050640634533252
} ],
"sink" : {
"description" : "[email protected]"
}
}
Query made progress: {
"id" : "a5b1f905-5575-43a7-afe9-dead0e4de2a7",
"runId" : "8caea640-8772-4aab-ab13-84c1e952fb77",
"name" : null,
"timestamp" : "2017-08-15T07:28:27.075Z",
"numInputRows" : 1,
"processedRowsPerSecond" : 0.272108843537415,
"durationMs" : {
"addBatch" : 2844,
"getBatch" : 445,
"queryPlanning" : 293,
"triggerExecution" : 3672
},
"stateOperators" : [ ],
"sources" : [ {
"description" : "KafkaSource[Subscribe[test1]]",
"startOffset" : {
"test1" : {
"0" : 19
}
},
"endOffset" : {
"test1" : {
"0" : 20
}
},
"numInputRows" : 1,
"processedRowsPerSecond" : 0.272108843537415
} ],
"sink" : {
"description" : "[email protected]"
}
}
为什么我发送1个消息,那么它有两种不同的结果。
ps: 1.我的主程序问题是我应该用spark来调用数据每隔5分钟,比如00:00-00:05,00:05-00:10等等。每天有288点CAL 2.so我的想法是使用结构化数据流过滤特定的数据和没有过滤器来存储数据库,下一次读数据库和结构化数据流一起
3.so我应该倾听每批量更新我的时间来阅读数据库。
哦,谢谢,我有两个writestream。如何给一个特定的流添加一个监听器 – Aaron