2016-11-04 47 views
2

在键控流上,我希望在新事件到达时立即为每个新传入事件计算一次窗口函数,并在过去30天内为其提供此密钥的所有早期事件的上下文,一个迭代器。Flink Streaming:如何处理每个事件与过去30天的所有事件?

预期的行为类似于30天长度和1纳秒幻灯片的滑动窗口,每传入事件只计算一次窗口函数。

我看不出如何映射在内置翻滚/滑动/会话窗口与/之上没有触发器/逐出器这种行为等

任何人可以帮助?或者这是否需要编写我自己的Window Assigner或我自己的键控状态处理?

回答

3

你说得对,用Flink提供的窗口原语来模拟你的用例并不容易。

我能想到的最佳解决方案是实现自定义运算符(OneInputStreamOperator)。这是一个相当低级的界面,可以访问记录时间戳,水印和状态(许多Flink的内置操作符都基于该界面)。当接收到新记录时,操作员会将其放入优先级队列中,该队列按时间戳排序,移除30天前的所有元素,并对队列中其余元素评估该功能。

请注意,应将队列注册为受管状态以使操作员容错。如果您想使用事件时间,则只能在收到水印时执行评估和丢弃数据。

当实现OneInputStreamOperator接口时,可能有助于查看Flink的内置运算符,例如StreamFilter或其中一个更复杂的运算符。

可以使用transform()方法将定制操作员应用于DataStreamKeyedStream(由DataStream.keyBy()获得)。

+0

谢谢你指点我正确的方向。添加强大的自定义转换非常简单,给我留下了深刻的印象。下面是我如何实现解决方案的一些细节(现在跳过托管状态的事情):使用OneInputStreamOperator扩展AbstractStreamOperator(请参阅StreamFilter for example方法实现),然后通过transform将您的自定义类实例应用到流(... )。顺便说一句:这个用例在例如行为分析,根据客户id进行分组,并且希望根据历史记录立即对每个新事件做出反应。 –

+0

感谢您的评论。我将我的回答扩展到了'transform()'方法和'StreamFilter'类。 –

+0

@cubiclettuce请分享示例代码,如果你已经解决了这个问题。我必须完成相同的任务。提前致谢。 –

相关问题