我有一个使用生产者类以产生消息的多线程应用程序,前面我使用以下代码来创建生产者为每个request.where KafkaProducer是新与如下每个请求建:如何重新连接一旦关闭卡夫卡生产者?
KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(prop);
ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, objBytes);
producer.send(data, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
isValidMsg[0] = false;
exception.printStackTrace();
saveOrUpdateLog(msgBean, producerType, exception);
logger.error("ERROR:Unable to produce message.",exception);
}
}
});
producer.close();
然后我读卡夫卡关于生产者的文档,并且知道我们应该使用单个生产者实例来获得良好的性能。
然后,我在单例类中创建了一个KafkaProducer实例。
现在当&我们应该关闭生产者。显然,如果我们关闭第一个发送请求后,制片人就不会发现生产者要重新发送消息,因此抛出:
java.lang.IllegalStateException: Cannot send after the producer is closed.
,或者我们如何可以重新连接到生产者关闭一次。 问题是如果程序崩溃或有异常呢?
它是同步/异步。此外,如果关闭一些如何:例外/应用程序崩溃,然后如何重新连接。请注意,我不会重新初始化KafkaProducer isntance,因为它不为null,并且即使在调用close()方法后也会保留所有属性。此外,我有多个应用程序,即4个使用此共享生产者发送消息到多个主题的消费者。 – usman
@usman你为什么说它既是同步又是异步? [你在哪里看到同步版本](https://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html)? –
https://kafka.apache.org/08/documentation#implementation “包装2个低级别生产者的生产者API - ”。那么卡夫卡提供了什么方法呢?你的代码显示我们必须重新实例化对象。 – usman