2017-05-04 153 views
2

我有一个情况我需要检查一个特定的消息是否已经存在一个主题或不在,我需要的主题绝对没有重复。阿帕奇卡夫卡:检查消息的存在,在主题

任何一个可以建议这样做,而不是消耗所有的消息和检查,对其中任何优雅的方式。

回答

0

我不认为自己在卡夫卡的专家,但我认为你假装什么是“反对”卡夫卡的本质。

但是我出来使用Java的卡夫卡流库的解决方案。基本上,该方法如下:

  • 地图的每个消息到一个新的键值,其中关键是早期密钥和它的值的组合:(key1, message1) -> (key1-message1, message1)

  • 组使用按键消息,作为此操作的结果,您将获得KGroupedStream

  • 应用reduce函数,将值修改为一些自定义值,例如字符串“重复值”。

  • 转换所产生的KTable后减少到KStream并将它推到一个新的卡夫卡主题。

有在前面的解释这么多的假设,我要为了提供一些代码给一些轻:

KStreamBuilder builder = new KStreamBuilder(); 
KStream<String, String> resources = builder.stream("topic-where-the-messages-are-sent"); 

KeyValueMapper<String, String, KeyValue<String,String>> kvMapper = new KeyValueMapper<String, String, KeyValue<String,String>>() { 
    public KeyValue<String, String> apply(String key, String value) { 
     return new KeyValue<String, String>(key + "-" + value, value); 
    } 
}; 

Reducer<String> reducer = new Reducer<String>() { 
    public String apply(String value1, String value2) { 
     return "Duplicated message"; 
    } 
}; 

resources.map(kvMapper) 
    .groupByKey() 
    .reduce(reducer, "test-store-name") 
    .toStream() 
    .to("unique-message-output"); 

KafkaStreams streams = new KafkaStreams(builder, props); 
streams.start(); 

有想法,这可能不是一个最佳的解决方案,也许你不会认为这是解决你的问题的“优雅”方式。

我希望它有帮助。

相关问题