我已经使用spring-kafka lib实施了Kafka消费者。 我有一个2分区的卡夫卡主题,我也使用ConcurrentKafkaListenerContainerFactory
并发级别设置为2,因为每个容器实例应根据spring-kafka documentation从单个分区消耗。Spring-kafka听众协会
KafkaMessageListenerContainer接收来自单个线程上所有 主题/分区的所有消息。 ConcurrentMessageListenerContainer代表1个或多个 KafkaMessageListenerContainer以提供多线程消耗。
还有就是我的消费类:
@Component
public class KafkaConsumer {
private HashMap<String, LinkedBlockingQueue<Event>> hashMap = new HashMap<>();
@KafkaListener(topics = "${kafka.topic}", groupId = "events_group")
public void receive(ConsumerRecord<?, ?> record, Consumer consumer) throws InterruptedException {
String message = record.value().toString();
Event event = EventFactory.createEvent(message);
String customerId = event.getAttributeStringValue(DefinedField.CUSTOMER_ID);
// add event to hashMap
LinkedBlockingQueue<Event> queue = hashMap.get(customerId);
if (queue == null) {
queue = new LinkedBlockingQueue<>();
queue.add(event);
hashMap.put(customerId, queue);
} else {
queue.add(event);
}
}
}
正如你看到的我有“的HashMap”在收集我把我的事件,基于消息的相应的队列“CUSTOMER_ID”属性。 这种功能在多线程访问的情况下需要额外的同步,正如我看到的,spring-kafka只为所有容器创建一个bean实例,而不是为每个容器创建单独的bean实例,以避免并发问题。
如何以编程方式更改此逻辑?
我看到修复这个问题的唯一奇怪的方法是使用两个JVM运行一个单独的应用程序与单线程使用者,因此使用#receive方法访问KafkaConsumer类将是单线程的。
thx解释 – MeetJoeBlack