2017-08-25 83 views

回答

1

新的生产者总是异步的,你应该调用future.get()来使其同步。当添加future.get()等基本相同的功能时,创建两个apis方法并不值得。

从发送的文档()这里

https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

由于发送调用是异步它返回 RecordMetadata将被分配到该记录中的未来。调用 上的get()将会阻塞,直到关联的请求完成,然后 返回记录的元数据或抛出发送记录时发生的任何异常 。

如果要模拟一个简单的阻塞调用你可以立即调用get()方法 :

byte[] key = "key".getBytes(); 
byte[] value = "value".getBytes(); 
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value); 
producer.send(record).get(); 
+0

谢谢。有没有文档? – ntysdd

+0

0.11生产者javadocs在这里https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html –

+0

其实我已经读过了。我只想知道一些“官方”的东西。 – ntysdd

0

为什么你想让send()同步?

这是一个kafka功能来批量消息以获得更好的吞吐量。

异步发送

配料是效率的主要驱动力之一,以使拌和卡夫卡生产者将试图在内存中积累的数据,并在单个请求发送大批量。批处理可以配置为累积不超过固定数量的消息,并且不超过某个固定的延迟限制(比如64k或10ms)。这允许发送更多字节的累积,并且在服务器上几个较大的I/O操作。这种缓冲是可配置的,并提供了一种机制来折中少量额外的延迟以获得更好的吞吐量。

由于api只支持异步方法,所以没有办法做一个发送同步,但是有一些配置可以指定做一些工作。

您可以将batch.size设置为0.在这种情况下,禁用消息传递。

不过我想你应该刚刚离开batch.size默认并设置linger.ms为0(这也是默认值)。在这种情况下,如果同时发送多条消息,则会立即将其批量发送一次。

生产者将在请求传输之间到达的任何记录分组成单个批处理请求。通常情况下,这只会在记录到达速度快于发送时才发生。

如果你想确保该消息发送成功坚持,你coould设置的ACK为-1或1 〜3(例如)关于

更多信息生产者配置,你可以参考https://kafka.apache.org/documentation/#producerconfigs

+0

我相信同步发送是在某些情况下是有用的。或者为什么他们提供过去? – ntysdd

+0

例如,我想确认我的留言是持续的 – ntysdd

+0

答案更新后,希望可以帮到你。 – GuangshengZuo

相关问题