2017-02-22 26 views
2

我有一个主题和多个谓词,每个主题都有一个与之关联的输出主题。我想将每条记录发送给谓词解析为真的所有主题。我使用Luwak来测试记录满足哪些谓词(要使用这个库来评估带有谓词列表的文档,并告诉你哪些谓词匹配 - 即我只调用一次来获得满足谓词的列表)。将消息流式传输到多个主题

我想为此使用卡夫卡流,但似乎没有在KStream上适当的方法(KStream#分支只将记录路由到单个主题)。

一个可能的方法如下:

Stream from master 
Map the values into a format with the original content and the list of matching predicates 
Stream to an intermediate with-matches topic 

For each predicate/output topic 
    Stream from intermediate with-matches topic 
    Filter "does list of matches predicates contain predicate ID" 
    Map the values to just the original content 
    Stream to corresponding output topic 

这种中间话题似乎“笨重”虽然。有更好的建议吗?

我使用:

  • 卡夫卡v0.10.1.1
  • 鲁瓦克V1.4.0

回答

5

你可以简单的适用于平行多个过滤器以相同KStream实例:

KStream stream = ... 

stream.filter(new MyPredicate1()).to("output-topic-1"); 
stream.filter(new MyPredicate2()).to("output-topic-2"); 
stream.filter(new MyPredicate3()).to("output-topic-3"); 
// ... as as many as you need 

每个记录将被发送到每个谓词一次 - 它在概念上是一个broadc适用于所有过滤器,但记录不会被物理复制,因此不存在内存开销。