我想弄清楚如何限制会话的最大长度。在做触发时。我现在的触发看起来是这样的:使用触发器限制窗口的最大长度
return AfterEach.inOrder(
// speculatively trigger
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(earlyFiringInterval).orFinally(AfterWatermark.pastEndOfWindow())),
// finally trigger for late
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(lateFiringInterval)));
这工作得很好,因为它触发每earlyFiringInterval
时间单位,直到水印经过窗口的末尾,然后它会触发每lateFiringInterval
时间单位不止于此。
不幸的是,会话可能会持续数天,这会导致窗口长时间保持打开状态,并导致水印滞留。我想建立一个触发器,它可以“切割”的窗口,这样可以:
- 没有会话可以比一些
maxSessionLength
时间(事件时间)更长。 - 或者,将会话限制在窗格中的某些
maxSessionLength
事件数量。 - 这是在积累模式下运作的。 (不理想)
所以,到目前为止,我有:
return AfterEach.inOrder(
Repeatedly
// speculatively trigger at every 'earlyFiringInterval'
.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(earlyFiringInterval)
// terminate trigger when any of the following conditions are met:
// * We have collected either 'maxEventCount' events in the pane
// * Watermark has passed the window
.orFinally(AfterFirst.of(AfterPane.elementCountAtLeast(maxEventCount), AfterWatermark.pastEndOfWindow()))),
Repeatedly
// trigger for late data at every 'lateFiringInterval'
.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(lateFiringInterval)))
.orFinally(AfterPane.elementCountAtLeast(maxEventCount));
我想知道如果这是要走或者有更好的方法可以做到“限制窗口大小”的方式。
你的意思是说,'AfterEach.repeatedly(...)'触发的问题和一个你在帖子末尾提到的内容是否等同? –
我确实希望通过'outputTimeFns'(尽管有用)选项切断会话,但这并不能帮助我找出一些退化情况,其中一些会话可能持续多天(通常是客户端错误)的会话可能会持续水印回来几天。 –
谢谢。我会尝试一下。有没有办法根据最大窗口/会话持续时间进行限制? –