我有一个结构化的流式设置,运行良好,但我希望在运行时监控它。监控结构化流式传输
我已经建立了一个EventCollector
class EventCollector extends StreamingQueryListener{
override def onQueryStarted(event: QueryStartedEvent): Unit = {
println("Start")
}
override def onQueryProgress(event: QueryProgressEvent): Unit = {
println(event.queryStatus.prettyJson)
}
override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {
println("Term")
}
我已经建立了一个EventCollector并添加侦听到我的火花会议
val listener = new EventCollector()
spark.streams.addListener(listener)
然后我关火查询
val query = inputDF.writeStream
//.format("console")
.queryName("Stream")
.foreach(writer)
.start()
query.awaitTermination()
然而,onQueryProgress永远不会被击中。 onQueryStarted的确如此,但我希望以特定的时间间隔获取查询的进度,以监控查询的执行情况。任何人都可以协助吗?
继最新的Spark Summit之后。我们了解到,我们正在寻找的一些信息可以在火花检查点文件中找到。 –