2017-10-19 87 views
0

我试图过滤出在给定(跳跃)时间窗口中长度为T的密钥出现频率高于阈值N的任何消息。Kafka Streams - 过滤在时间窗口中频繁出现的消息

例如,下面的流中:

#time, key 
0, A 
1, B 
2, A 
3, C 
4, D 
5, A 
6, B 
7, C 
8, C 
9, D 
10, A 
11, D 
12, D 
13, D 
14, D 
15, D 

N=2T=3,结果应该是

0, A 
2, A 
7, C 
8, C 
9, D 
11, D 
12, D 
13, D 
14, D 
15, D 

可替代地,如果上述是不可能的,简化是只在满足阈值后过滤消息:

#time, key 
2, A 
8, C 
11, D 
12, D 
13, D 
14, D 
15, D 

Kafka Streams可能吗?

到目前为止,我已经尝试创建流的windowed countKTable的实例)并将其连接回原始流。我使用KTable#toStream((k,v) -> k.key())windowed count的密钥更改回原始密钥,并将dummy aggregation更改回KTable的实例。这似乎会引起延迟,导致leftJoin错过超过阈值后非常接近的消息。

final Serde<String> stringSerde = Serdes.String(); 
    final Serde<Long> longSerde = Serdes.Long(); 

    KStream<String, Long> wcount = source.groupByKey() 
      .count(TimeWindows.of(TimeUnit.SECONDS.toMillis(5)),"Counts") 
      .toStream((k,v) -> k.key()); 

    // perform dummy aggregation to get KTable 
    KTable<String, Long> wcountTable = wcount.groupByKey(stringSerde, longSerde) 
       .reduce((aggValue, newValue) -> newValue, 
       "dummy-aggregation-store"); 

    // left join and filter with threshold N=1 
    source.leftJoin(wcountTable, (leftValue, rightValue) -> rightValue,stringSerde, stringSerde) 
      .filter((k,v) -> v!=null) 
      .filter((k,v) -> v>1) 
      .print("output"); 

我还试图执行KStream - KStream加入与适当的窗口(离开了虚设聚合):

source.join(wcount, (leftValue, rightValue) -> rightValue, JoinWindows.of(TimeUnit.SECONDS.toMillis(5)),stringSerde, stringSerde, longSerde) 
      .filter((k,v) -> v!=null) 
      .filter((k,v) -> v>1) 
      .print("output"); 

因为每个UPSERTwcount这导致重复的输出触发事件。

回答

1

这当然是可能的。您可以应用一个窗口聚合来收集列表中的所有原始数据(即,您手动实现窗口)。之后,您将应用一个评估窗口的flatMap。如果阈值尚未达到,则不发射任何信号。如果第一次达到阈值,则会发出所有缓冲数据。对于所有进一步调用flatMap的计数大于阈值的情况,您只需发出列表中最新的一个(您知道您之前已发出所有其他调用flatMap的函数,即仅发出新添加的函数)。

注意:您需要禁用KTable缓存,即设置config参数“cache.max.bytes.buffering”= 0。否则,算法将无法正常工作。

事情是这样的:

KStream<Windowed<K>, List<V>> windows = stream.groupByKey() 
               .aggregate(
               /*init with empty list*/, 
               /*add value to list in agg*/, 
               TimeWindows.of()...), 
               ...) 
               .toStream(); 
KStream<K,V> thresholdMetStream = windows.flatMap(
              /* if List#size < threshold 
               then return empty-list, ie, nothing 
               elseif List#size == threshold 
               then return whole list 
               else [List#size > threshold] 
               then return last element from list 
              */); 
+1

谢谢,我得到了这个工作(几乎)根据需要。对于[翻滚窗口](https://kafka.apache.org/0110/documentation/streams/developer-guide#streams_dsl_windowing),除了在新时间窗口开始时的事件(当相关事件在前面的窗口)。对于跳频窗口,由于多个窗口可以包含单个触发事件序列,因此输出流中会有重复。所以我选择使用Processor API和窗口化持久存储来实现解决方案。 –

相关问题