2017-05-07 46 views
2

我是新来的发展卡夫卡流应用。我的流处理器旨在根据输入json消息中的用户键的值对json消息进行排序。使用卡夫卡流有条件排序JSON输入流

Message 1: {"UserID": "1", "Score":"123", "meta":"qwert"} 
Message 2: {"UserID": "5", "Score":"780", "meta":"mnbvs"} 
Message 3: {"UserID": "2", "Score":"0", "meta":"fghjk"} 

我在这里读到Dynamically connecting a Kafka input stream to multiple output streams表示没有动态解决方案。

在我的使用情况,我知道了用户密钥和输出的话题,我需要输入流排序。因此,我正在为每个处理器应用程序匹配不同用户ID的每个用户编写独立的处理器应用程序。从在卡夫卡同一JSON输入主题读取,但每一个只在预设的用户条件被满足写入该信息至一个输出主题为特定用户

所有不同流处理器的应用程序。

public class SwitchStream extends AbstractProcessor<String, String> { 
     @Override 
     public void process(String key, String value) { 
      HashMap<String, String> message = new HashMap<>(); 
      ObjectMapper mapper = new ObjectMapper(); 
      try { 
       message = mapper.readValue(value, HashMap.class); 
      } catch (IOException e){} 

      // User condition UserID = 1 
      if(message.get("UserID").equals("1")) { 
       context().forward(key, value); 
       context().commit(); 
      } 
     } 

     public static void main(String[] args) throws Exception { 
      Properties props = new Properties(); 
      props.put(StreamsConfig.APPLICATION_ID_CONFIG, "sort-stream-processor"); 
      props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
      props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); 
      props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); 

      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 
      TopologyBuilder builder = new TopologyBuilder(); 
      builder.addSource("Source", "INPUT_TOPIC"); 
      builder.addProcessor("Process", SwitchStream::new, "Source"); 
      builder.addSink("Sink", "OUTPUT_TOPIC", "Process"); 

      KafkaStreams streams = new KafkaStreams(builder, props); 
      streams.start(); 
     } 
} 

问题1: 是否可以很容易地实现相同的功能使用高级流DSL替代,如果低级别的处理器API? (我承认我发现它更难理解并遵循其他高级流DSL的在线示例)

问题2: 输入json主题以20K-25K EPS的高速率获得输入。我的处理器应用似乎无法跟上这个输入流。我已经尝试过部署每个进程的多个实例,但结果不在我想要的地方。理想情况下,每个处理器实例应该能够处理3-5K EPS。

有没有改善我的逻辑处理器,或者使用高级流DSL写同样的处理器逻辑的方法吗?这会有所作为吗?

回答

3

您可以通过filter()在高级别DSL中执行此操作(因为只有在userID==1时您才会返回一条消息,因此您可以有效实施过滤器)。你可以概括这个过滤模式,通过使用KStream#branch()(见文档进一步的细节:http://docs.confluent.io/current/streams/developer-guide.html#stateless-transformations)。另请阅读JavaDocs:http://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/streams

KStreamBuilder builder = new KStreamBuilder(); 
builder.stream("INPUT_TOPIC") 
     .filter(new Predicate() { 
      @Overwrite 
      boolean test(String key, String value) { 
       // put you processor logic here 
       return message.get("UserID").equals("1") 
      } 
     }) 
     .to("OUTPUT_TOPIC"); 

关于性能。一个实例应该能够处理10K +记录。没有任何进一步的信息很难说出问题的可能性。我建议要问在卡夫卡的用户列表(见http://kafka.apache.org/contact

+0

感谢马蒂亚斯!我以低EPS获得的问题最终与旧版Kafka +环境有关。 – iron3rd