1
我用下面的代码C++
产生消息Kafka
:C++与卡夫卡 - 消费者只得到一些制片人的消息
#include <thread>
#include <cppkafka/producer.h>
using namespace cppkafka;
int main()
{
for(int i = 0 ; i < 100 ; i++)
{
std::cout << "sending msg number: " << i << std::endl;
std::string str("{'msg number' : " + std::to_string(i) + "}");
// Create a message builder for this topic
MessageBuilder builder("test");
// Construct the configuration
Configuration config =
{
{ "metadata.broker.list", "192.168.1.100:9092"}
};
// Create the producer
Producer producer(config);
builder.payload(str);
producer.produce(builder); //Only a few messages are received!
std::this_thread::sleep_for(std::chrono::milliseconds(50));//If I remove this, no message is received!
}
}
在我的机器,我已经运行Zookeeper
和Kafka server
,我跑一个consumer
,以显示接收的消息,通过使用这样的:
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.100:9092 --topic test
我C++
代码产生以下:
sending msg number: 0
sending msg number: 1
sending msg number: 2
sending msg number: 3
(...) //from 0 to 99...all the messages are sent!
sending msg number: 98
sending msg number: 99
我期待消费者,接收所有这些消息,但我看到的只是几个:
{'msg number' : 40}
{'msg number' : 58}
{'msg number' : 70}
{'msg number' : 75}
{'msg number' : 91}
{'msg number' : 96}
并没有更多的好评。
如果我删除行:
std::this_thread::sleep_for(std::chrono::milliseconds(50));
我没有收到任何消息。 为什么我的Kafka
服务器没有收到我的所有消息?
难道是某种形式的垃圾邮件防护,即从过快发送消息和服务器只是开沟他们>? – GPPK
我想到了这一点,这就是为什么我放置了睡眠(50毫秒)。无论哪种方式,如果我使用这个,我仍然没有收到所有的消息..卡夫卡是否应该能够接收一切,并排队他们? – waas1919
嗨@GPPK你可以把你的评论作为答案?你是对的,这就是这个问题的答案。 – waas1919