我在本地主机上设置了Kafka,并试图监控Kafka Producer在网络问题情况下的行为。Kafka Producer行为
即使所有经纪人都关门,制片人也不会给出任何错误。 我正在使用同步制作者和Kafka版本0.8。
如果所有的经纪人都失败了,生产者是否可以接收异常?
我在本地主机上设置了Kafka,并试图监控Kafka Producer在网络问题情况下的行为。Kafka Producer行为
即使所有经纪人都关门,制片人也不会给出任何错误。 我正在使用同步制作者和Kafka版本0.8。
如果所有的经纪人都失败了,生产者是否可以接收异常?
请您制作添加异常处理:
try{
Producer logic
}
catch (Exception ex) {
String errorMsg = "Failed to publish events";
logger.error("Failed to publish events", ex);
result = Status.BACKOFF;
请让我知道,如果它仍然无法工作。
这取决于你的生产配置(producer config)
尤其要对参数注意:
(METADATA_FETCH_TIMEOUT_CONFIG, 60000),
(TIMEOUT_CONFIG, 10000),
(RETRY_BACKOFF_MS_CONFIG, 100),
(RECONNECT_BACKOFF_MS_CONFIG, 1000)
你肯定必须改变对您的设置参数。所有这些参数都会影响生产者行为。
Java/Scala(Async producer)中的新8.2生产者为每条消息都有一个回调方法。您可以尝试处理回调中的故障,可能会重试。回调方法需要两个参数(Exception和MessageMetatData)。随时只有一个设置。当您的消息被成功发送时,肉食元数据会被设置,异常情况下会出现问题。
使用同步生产者,虽然你将不得不将max.retries和其他配置设置为@leshkin指出的。
你如何运行你的制作人?它是控制台生产者? – Bector
不,我使用Producer API在Java中制作了一个生产者 – Hubble