2016-12-02 83 views
7

我有一个结构化的流式设置,运行良好,但我希望在运行时监控它。监控结构化流式传输

我已经建立了一个EventCollector

class EventCollector extends StreamingQueryListener{ 
    override def onQueryStarted(event: QueryStartedEvent): Unit = { 
    println("Start") 
    } 

    override def onQueryProgress(event: QueryProgressEvent): Unit = { 
    println(event.queryStatus.prettyJson) 
    } 

    override def onQueryTerminated(event: QueryTerminatedEvent): Unit = { 
    println("Term") 
    } 

我已经建立了一个EventCollector并添加侦听到我的火花会议

val listener = new EventCollector() 
spark.streams.addListener(listener) 

然后我关火查询

val query = inputDF.writeStream 
    //.format("console") 
    .queryName("Stream") 
    .foreach(writer) 
    .start() 

query.awaitTermination() 

然而,onQueryProgress永远不会被击中。 onQueryStarted的确如此,但我希望以特定的时间间隔获取查询的进度,以监控查询的执行情况。任何人都可以协助吗?

+0

继最新的Spark Summit之后。我们了解到,我们正在寻找的一些信息可以在火花检查点文件中找到。 –

回答

3

很多研究这一主题之后,这是我发现......

OnQueryProgress被查询之间打英寸我不确定这个有意的功能与否,但是当我们从一个文件流式传输数据时,OnQueryProgress并不会触发。

我发现的一个解决方案是依赖于foreach编写器接收器,并在过程函数中执行我自己的性能分析。不幸的是,我们无法访问正在运行的查询的特定信息。或者,我还没有想出如何。这是我在沙箱中已经实现了分析性能:

val writer = new ForeachWriter[rawDataRow] { 
    def open(partitionId: Long, version: Long):Boolean = { 
     //We end up here in between files 
     true 
    } 
    def process(value: rawDataRow) = { 
     counter += 1 

     if(counter % 1000 == 0) { 
      val currentTime = System.nanoTime() 
      val elapsedTime = (currentTime - startTime)/1000000000.0 

      println(s"Records Written: $counter") 
      println(s"Time Elapsed: $elapsedTime seconds") 
     } 
    } 
} 

的另一种方式获得指标:

另一种方式来获取有关正在运行的查询信息是打GET端点火花为我们提供。

http://localhost:4040/metrics

http://localhost:4040/api/v1/

文档浏览:http://spark.apache.org/docs/latest/monitoring.html

更新数2017年9月2日: 测试在正常的火花流,不规整流

免责声明,这可能不适用于结构化流媒体,我需要设置一个测试床来确认。然而,它确实可以处理普通的火花流(在这个例子中从卡夫卡消费)。

我相信,由于Spark Stream 2.2已经发布,存在新的端点可以检索更多关于流性能的指标。这可能在以前的版本中已经存在,我只是错过了它,但我想确保它为任何其他人搜索此信息记录。

http://localhost:4040/api/v1/applications/ {} applicationIdHere /流/统计

这是一个看起来就像是在2.2加入(或已存在的和刚添加的文档,我不知道终点,我没有选中)。

不管怎么说,这增加了在指定此格式的流媒体应用程序指标:

{ 
    "startTime" : "2017-09-13T14:02:28.883GMT", 
    "batchDuration" : 1000, 
    "numReceivers" : 0, 
    "numActiveReceivers" : 0, 
    "numInactiveReceivers" : 0, 
    "numTotalCompletedBatches" : 90379, 
    "numRetainedCompletedBatches" : 1000, 
    "numActiveBatches" : 0, 
    "numProcessedRecords" : 39652167, 
    "numReceivedRecords" : 39652167, 
    "avgInputRate" : 771.722, 
    "avgSchedulingDelay" : 2, 
    "avgProcessingTime" : 85, 
    "avgTotalDelay" : 87 
} 

这让我们建立使用由星火暴露了REST端点自己的自定义指标/监控应用的能力。

+0

你是什么意思的“OnQueryProgress获取命中之间的查询。”?仅供参考,在即将推出的Spark 2.1中,这些事件将在批量运行时发布,并且即使没有新数据发布,也会每隔10秒发布一次。 – zsxwing

+0

嘿,对不起,我的意思是在查询之间调用OnQueryProgress函数。我们想要的是在进行查询时调用OnQueryProgress。这样我们就可以剖析它的表现。这仍然是可能的,但我还没有弄清楚。 –

+1

我明白了。当有大量批量运行时,可能需要很长时间才能完成。 FileStreamSource有一个选项'maxFilesPerTrigger'来限制每个批处理文件的数量。您可以使用它来生成小批量而不是大批量。 – zsxwing