我试图过滤出在给定(跳跃)时间窗口中长度为T
的密钥出现频率高于阈值N
的任何消息。Kafka Streams - 过滤在时间窗口中频繁出现的消息
例如,下面的流中:
#time, key
0, A
1, B
2, A
3, C
4, D
5, A
6, B
7, C
8, C
9, D
10, A
11, D
12, D
13, D
14, D
15, D
和N=2
和T=3
,结果应该是
0, A
2, A
7, C
8, C
9, D
11, D
12, D
13, D
14, D
15, D
可替代地,如果上述是不可能的,简化是只在满足阈值后过滤消息:
#time, key
2, A
8, C
11, D
12, D
13, D
14, D
15, D
Kafka Streams可能吗?
到目前为止,我已经尝试创建流的windowed count
(KTable
的实例)并将其连接回原始流。我使用KTable#toStream((k,v) -> k.key())
将windowed count
的密钥更改回原始密钥,并将dummy aggregation更改回KTable
的实例。这似乎会引起延迟,导致leftJoin
错过超过阈值后非常接近的消息。
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
KStream<String, Long> wcount = source.groupByKey()
.count(TimeWindows.of(TimeUnit.SECONDS.toMillis(5)),"Counts")
.toStream((k,v) -> k.key());
// perform dummy aggregation to get KTable
KTable<String, Long> wcountTable = wcount.groupByKey(stringSerde, longSerde)
.reduce((aggValue, newValue) -> newValue,
"dummy-aggregation-store");
// left join and filter with threshold N=1
source.leftJoin(wcountTable, (leftValue, rightValue) -> rightValue,stringSerde, stringSerde)
.filter((k,v) -> v!=null)
.filter((k,v) -> v>1)
.print("output");
我还试图执行KStream
- KStream
加入与适当的窗口(离开了虚设聚合):
source.join(wcount, (leftValue, rightValue) -> rightValue, JoinWindows.of(TimeUnit.SECONDS.toMillis(5)),stringSerde, stringSerde, longSerde)
.filter((k,v) -> v!=null)
.filter((k,v) -> v>1)
.print("output");
因为每个UPSERT
成wcount
这导致重复的输出触发事件。
谢谢,我得到了这个工作(几乎)根据需要。对于[翻滚窗口](https://kafka.apache.org/0110/documentation/streams/developer-guide#streams_dsl_windowing),除了在新时间窗口开始时的事件(当相关事件在前面的窗口)。对于跳频窗口,由于多个窗口可以包含单个触发事件序列,因此输出流中会有重复。所以我选择使用Processor API和窗口化持久存储来实现解决方案。 –