2016-10-07 110 views
1

我有一个使用生产者类以产生消息的多线程应用程序,前面我使用以下代码来创建生产者为每个request.where KafkaProducer是新与如下每个请求建:如何重新连接一旦关闭卡夫卡生产者?

KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(prop); 

ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, objBytes); 
producer.send(data, new Callback() { 

       @Override 
       public void onCompletion(RecordMetadata metadata, Exception exception) { 
        if (exception != null) { 
         isValidMsg[0] = false; 
         exception.printStackTrace(); 
         saveOrUpdateLog(msgBean, producerType, exception); 
         logger.error("ERROR:Unable to produce message.",exception); 
        } 
       } 
      }); 
producer.close(); 

然后我读卡夫卡关于生产者的文档,并且知道我们应该使用单个生产者实例来获得良好的性能。

然后,我在单例类中创建了一个KafkaProducer实例。

现在当&我们应该关闭生产者。显然,如果我们关闭第一个发送请求后,制片人就不会发现生产者要重新发送消息,因此抛出:

java.lang.IllegalStateException: Cannot send after the producer is closed. 

,或者我们如何可以重新连接到生产者关闭一次。 问题是如果程序崩溃或有异常呢?

回答

2

KafkaProducer.send方法是异步的,它返回一个Future[RecordMetadata]。如果您在发送后立即致电close,则说明您有竞争状况,并且由于KafkaProducer的缓冲性质,您的消息可能永远无法发送。

如果您的生产者在您的应用程序的整个生命周期中都在使用,请不要关闭它,并在应用程序终止后让它死掉。如文档中所述,生产者可以安全地在多线程环境中使用,因此您应该重新使用同一个实例。

如果你仍然认为你需要关闭KafkaProducer在某些情况下,你可以添加一个isClosed标志,你的卡夫卡对象内部和监控,如果消费者需要重新发送数据。草图可以是:

object KafkaOwner { 
    private var producer: KafkaProducer = ??? 
    @volatile private var isClosed = false 

    def close(): Unit = { 
    if (!isClosed) { 
     kafkaProducer.close() 
     isClosed = true 
    } 
    } 

    def instance: KafkaProducer = { 
    this.synchronized { 
     if (!isClosed) producer 
     else { 
     producer = new KafkaProducer() 
     isClosed = false 
     } 
    } 
    } 
} 
+0

它是同步/异步。此外,如果关闭一些如何:例外/应用程序崩溃,然后如何重新连接。请注意,我不会重新初始化KafkaProducer isntance,因为它不为null,并且即使在调用close()方法后也会保留所有属性。此外,我有多个应用程序,即4个使用此共享生产者发送消息到多个主题的消费者。 – usman

+0

@usman你为什么说它既是同步又是异步? [你在哪里看到同步版本](https://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html)? –

+0

https://kafka.apache.org/08/documentation#implementation “包装2个低级别生产者的生产者API - ”。那么卡夫卡提供了什么方法呢?你的代码显示我们必须重新实例化对象。 – usman

1

如的javadoc KafkaProducer描述:

public void close() 

Close this producer. This method blocks until all previously sent requests complete. 
This method is equivalent to close(Long.MAX_VALUE, TimeUnit.MILLISECONDS). 

源:https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#close()

所以你不必担心你的邮件不会被发送即使您在发送后立即致电关闭。

如果您打算多次使用KafkaProducer,请在完成使用后关闭它。如果您仍然希望保证您的消息在您的方法完成之前实际发送,而不是在缓冲区中等待,请使用KafkaProducer#flush(),这将阻塞,直到发送当前缓冲区。如果您愿意,也可以在Future#get()上屏蔽。

还有一个警告要注意的,如果你不打算永远闭上你的KafkaProducer(例如,在短暂的应用程序,您只需发送一些数据和应用程序将立即终止发送后) 。 KafkaProducer IO线程是守护进程线程,这意味着JVM不会等到该线程完成终止虚拟机。因此,要确保您的消息实际上使用KafkaProducer#flush(),no-arg KafkaProducer#close()Future#get()上的阻止。

相关问题