2016-08-28 50 views
1

我正在使用java api实现apache kafka生产者。 Apache Kafka安装在本地主机上。 Zookeeper也在运行,但仍然producer.send()函数停留在发送消息和消息未发布。kafka java生产者卡在产生消息

我已经创建了“快速消息”主题。

以下是代码。

package com.hsahu.kafka.producer; 

import java.util.Properties; 
import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerRecord; 

public class KafkaProducerExample { 
public static void main(String[] args) { 

    Properties props = new Properties(); 

    props.put("bootstrap.servers", "localhost:9092"); 
    props.put("acks", "all"); 
    props.put("retries", 0); 
    props.put("batch.size", 16384); 
    props.put("linger.ms", 0); 
    props.put("buffer.memory", 33554432); 
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

    KafkaProducer<String, String> producer = new KafkaProducer<>(props); 

    try { 
     producer.send(new ProducerRecord<String, String>("fast-messages", "This is a dummy message")); 
    } catch(Exception ex) { 
     System.out.println(ex); 
    } 

    System.out.println("message publisher"); 

    producer.close(); 
} 

}

我应该怎么办?是我的代码错误或任何属性设置不正确或丢失?

回答

2

代码中没有任何问题。只有api版本和kafka服务器版本不匹配。所以我只更正了api版本,现在制作人正在工作。

+0

我同意。我使用的是kafka客户端0.10;然而,我的经纪人版本是0.9。我升级到.10,一切都很好。 –

1

测试下方

如果以上版本0.9则需要在经纪人CONFIG “advertised.host.name”

0

你可以尝试producer.flush()而不是producer.close()。 Flush()块直到消息发送到卡夫卡经纪人?我没有看到任何其他奇怪的东西。