2017-08-20 36 views
1

我需要能够从地图状态中删除比固定时间早的旧密钥。 我目前保持关键状态图中每个事件的时间戳,我想有一个ansyncronous进程将删除这些陈旧的密钥。在Flink Mapstate中删除TTL过期密钥

我使用RocksDB作为状态后端,我不认为RocksDB的Java API支持使用TTL打开的here

所以我的问题是:

  • 是它在所有可能拥有一个具有访问Mapstate因为它在操作功能上运行的异步线程?
  • 在这种情况下是否有更好的做法?

在此先感谢,在弗林克过期状态

回答

2

一个简单的方法是使用一个ProcessFunction运营商保持状态。然后,您可以使用计时器(处理时间计时器或事件时间计时器,具体取决于您的应用程序的意义),并清除onTimer方法中的状态。

+0

Flink的Table API显示了一个状态清理的例子:https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/ flink/table/runtime/aggregate/ProcessFunctionWithCleanupState.scala – twalthr

+0

嗨,感谢您的快速回复。你所建议的解决方案并不差,但我唯一缺少的是ProcessFunction似乎没有以异步方式拍摄OnTimer回调(对吗? – Eliran

+0

对于事件时间计时器,OnTimer回调获取在处理流元素的同一线程中调用,因为它处理触发定时器的水印。对于进程定时器,有一个单独的线程来实现定时器服务。 –