2016-03-03 154 views
0

我想弄清楚如何限制会话的最大长度。在做触发时。我现在的触发看起来是这样的:使用触发器限制窗口的最大长度

return AfterEach.inOrder(
       // speculatively trigger 
       Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(earlyFiringInterval).orFinally(AfterWatermark.pastEndOfWindow())), 
       // finally trigger for late 
       Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(lateFiringInterval))); 

这工作得很好,因为它触发每earlyFiringInterval时间单位,直到水印经过窗口的末尾,然后它会触发每lateFiringInterval时间单位不止于此。

不幸的是,会话可能会持续数天,这会导致窗口长时间保持打开状态,并导致水印滞留。我想建立一个触发器,它可以“切割”的窗口,这样可以:

  • 没有会话可以比一些maxSessionLength时间(事件时间)更长。
  • 或者,将会话限制在窗格中的某些maxSessionLength事件数量。 - 这是在积累模式下运作的。 (不理想)

所以,到目前为止,我有:

return AfterEach.inOrder(
       Repeatedly 
         // speculatively trigger at every 'earlyFiringInterval' 
         .forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(earlyFiringInterval) 
         // terminate trigger when any of the following conditions are met: 
         // * We have collected either 'maxEventCount' events in the pane 
         // * Watermark has passed the window 
         .orFinally(AfterFirst.of(AfterPane.elementCountAtLeast(maxEventCount), AfterWatermark.pastEndOfWindow()))), 
       Repeatedly 
         // trigger for late data at every 'lateFiringInterval' 
         .forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(lateFiringInterval))) 
         .orFinally(AfterPane.elementCountAtLeast(maxEventCount)); 

我想知道如果这是要走或者有更好的方法可以做到“限制窗口大小”的方式。

回答

3

您可以允许水印通过指定前进,同时保持你的会话完全保真的OutputTimeFn像这样:

Window.into(Sessions.withGapDuration(...)) 
     .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()) 

就像一个CombineFn决定是从分组输出转换(我们可以将GroupByKey视为经由级联组合),OutputTimeFn确定分组变换的输出的时间戳

的SDK提供了一些常见的选择:

  • OutputTimeFns.outputAtEndOfWindow()
  • OutputTimeFns.outputAtEarliestInputTimestamp()
  • OutputTimeFns.outputAtLatestInputTimestamp()

默认今天是outputAtEarliestInputTimestamp(),它允许在什么时间戳可以方面最大的灵活性适用于下游生产的元素,但不幸的是以维持水的(必要的)成本标记。

如果您不打算在窗口内的时间戳上明确输出,则选择outputAtEndOfWindow()将允许水印尽可能快地前进。

:此功能被标记Experimental。这意味着它的API可能会改变(例如,而不是接受任意的OutputTimeFn实现,它可能仅限于几个固定的常量)。这个概念几乎肯定会保留下来,因为我们总是需要决定分组转换输出的时间戳。

如果您仍然希望因为其他原因而将会话截断,请进行评论,然后我将详细说明其他选项。

顺便说一句,我强烈建议简化触发语法我们现在提供:

AfterWatermark.pastEndOfWindow() 
    .withEarlyFirings(
     AfterProcessingTime.pastFirstElementInPane() 
      .plusDelayOf(earlyFiringInterval)) 
    .withLateFirings(
     AfterProcessingTime.pastFirstElementInPane() 
      .plusDelayOf(lateFiringInterval)) 
+0

你的意思是说,'AfterEach.repeatedly(...)'触发的问题和一个你在帖子末尾提到的内容是否等同? –

+1

我确实希望通过'outputTimeFns'(尽管有用)选项切断会话,但这并不能帮助我找出一些退化情况,其中一些会话可能持续多天(通常是客户端错误)的会话可能会持续水印回来几天。 –

+0

谢谢。我会尝试一下。有没有办法根据最大窗口/会话持续时间进行限制? –