2017-09-21 29 views
0

我已经使用spring-kafka lib实施了Kafka消费者。 我有一个2分区的卡夫卡主题,我也使用ConcurrentKafkaListenerContainerFactory并发级别设置为2,因为每个容器实例应根据spring-kafka documentation从单个分区消耗。Spring-kafka听众协会

KafkaMessageListenerContainer接收来自单个线程上所有 主题/分区的所有消息。 ConcurrentMessageListenerContainer代表1个或多个 KafkaMessageListenerContainer以提供多线程消耗。

还有就是我的消费类:

@Component 
public class KafkaConsumer { 
    private HashMap<String, LinkedBlockingQueue<Event>> hashMap = new HashMap<>(); 

    @KafkaListener(topics = "${kafka.topic}", groupId = "events_group") 
    public void receive(ConsumerRecord<?, ?> record, Consumer consumer) throws InterruptedException { 
     String message = record.value().toString(); 
     Event event = EventFactory.createEvent(message); 
     String customerId = event.getAttributeStringValue(DefinedField.CUSTOMER_ID); 
     // add event to hashMap 
     LinkedBlockingQueue<Event> queue = hashMap.get(customerId); 
     if (queue == null) { 
      queue = new LinkedBlockingQueue<>(); 
      queue.add(event); 
      hashMap.put(customerId, queue); 
     } else { 
      queue.add(event); 
     } 
    } 
} 

正如你看到的我有“的HashMap”在收集我把我的事件,基于消息的相应的队列“CUSTOMER_ID”属性。 这种功能在多线程访问的情况下需要额外的同步,正如我看到的,spring-kafka只为所有容器创建一个bean实例,而不是为每个容器创建单独的bean实例,以避免并发问题。

如何以编程方式更改此逻辑?

我看到修复这个问题的唯一奇怪的方法是使用两个JVM运行一个单独的应用程序与单线程使用者,因此使用#receive方法访问KafkaConsumer类将是单线程的。

回答

1

这是正确的。这是如何工作的。框架并不依赖于bean,而仅仅是它的方法来传递消息给函数。

您可能会考虑为您的主题中的每个分区设置两个@KafkaListener方法。确实,来自一个分区的记录通过单个线程传送到@KafkaListener。所以,如果你真的不能忍受这种状态,你可以为每个线程使用两个HashMap

聆听者抽象背后的一般想法正是大约无状态行为。那KafkaConsumer是普通的Spring 单身人士 bean。你必须接受这一事实,并根据这种情况重新设计你的解决方案。

+0

thx解释 – MeetJoeBlack