2015-12-04 57 views
3

我想在收到消息时得到经纪人的回复。 我曾尝试使用CallBack机制(通过实现CallBack)在KafkaProducer.send中使用,但它没有工作,并没有调用onCompletion方法。

当我关闭Kafka服务器并尝试生成消息时,它确实调用了回调方法。如果消息是由制片人制作的,如何获得卡夫卡经纪人的确认?

是否有任何其他方式获得承认?

@Override 
    public void onCompletion(RecordMetadata metadata, Exception exception) { 
     long elapsedTime = System.currentTimeMillis() - startTime; 
     System.out.println("Called Callback method"); 
     if (metadata != null) { 
      System.out.println("message(" + key + ", " + message 
        + ") sent to partition(" + metadata.partition() + "), " 
        + "offset(" + metadata.offset() + ") in " + elapsedTime 
        + " ms"); 
     } else { 
      exception.printStackTrace(); 
     } 

    } 

props.put("bootstrap.servers", "localhost:9092"); 
props.put("client.id", "mytopic"); 
props.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class); 
props.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class); 

KafkaProducer<String, byte[]> producer = new KafkaProducer<String,byte[]>(props); 
long runtime = new Date().getTime(); 
String ip = "192.168.2."+ rnd.nextInt(255); 
String msg = runtime + ".www.ppop.com," + ip; 
producer.send(new ProducerRecord<String, byte[]>("mytopic", msg.getBytes()), `new TransCallBack(Calendar.getInstance().getTimeInMillis(), key, msg));` 

我正在使用kafka-client api 0.9.1与broker版本0.8.2。

+0

你可以显示你用来实现回调的代码吗?如果这个方法没有被调用,那么问题一定是在其他地方。 – morganw09dev

+0

我刚刚更新我的经纪人到0.9.0.0,它开始工作。现在的问题是,新的api将不会在0.8.2.x版本上调用Callback? – usman

+0

看到我的答案,但再次。向我们展示您用来向Kafka发送数据的代码,这可能是您问题的根源。不建议使用不同的Kafka版本。 – morganw09dev

回答

2

所以我并不是100%确定哪些版本适用于卡夫卡。目前我使用0.8.2,我知道0.9引入了一些突破性更改,但我无法确定现在哪些功能无法正常工作。

一个非常强烈的建议,我会使用对应于您的经纪人版本的卡夫卡客户端版本。如果你使用经纪人0.8.2,我也会使用kakfa-client 0.8.2。

你从来没有提出任何代码如何使用这个,所以我只是在黑暗中猜测。但是我已经在生产者中使用this method在卡夫卡0.8.2中实现了回拨功能。以下是方法签名。

public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback) 

我将在哪里调用该方法,我实际上是通过overriden方法传递类。

KafkaProducer<String, String> prod = new KafkaProducer<String, String>(props); 
ProducerRecord<String, String> record = //data to send to kafka 
prod.send(record, new Callback() { 
    @Override 
    public void onCompletion(RecordMetadata metadata, Exception e) { 
    if (e != null) { 
     e.printStackTrace(); 
    } else { 
     //implement logic here, or call another method to process metadata 
     System.out.println("Callback"); 
    } 
    } 
}); 

我假设有一种方法可以做到这一点,因为你已经做到了。但是,您必须提供代码,显示您是如何实际向卡夫卡发送记录的。除此之外,我只是猜测。

3

有一种简单的方法可以在KafkaProducer使用kafka 0.9版发布消息后从代理获取信息。你可以调用get()方法返回一个对象RecordMetadata,你可以得到信息,如胶印,topicPartition,下面的代码片段为例:

RecordMetadata m = kafkaProducer.send(new ProducerRecord<byte[], byte[]>(
         topic, key.getBytes("UTF-8"), message 
           .getBytes("UTF-8"))).get(); 
System.out.println("Message produced, offset: " + m.offset()); 
System.out.println("Message produced, partition : " + m.partition()); 
System.out.println("Message produced, topic: " + m.topic()); 
+3

与异步相比get()显着降低了性能 – chro

2

关闭生产者 - 一旦你完成所有发布您的邮件 - 这样的:

producer.close(); 

这确保了从回调下面的方法总是叫:

onCompletion(RecordMetadata metadata, Exception exception) 

注:我已经通过添加T线测试这o this sample Producer class,它的工作原理。

相关问题