2017-10-12 193 views
1

我用下面的代码C++产生消息KafkaC++与卡夫卡 - 消费者只得到一些制片人的消息

#include <thread> 
#include <cppkafka/producer.h> 

using namespace cppkafka; 

int main() 
{ 
    for(int i = 0 ; i < 100 ; i++) 
    { 
     std::cout << "sending msg number: " << i << std::endl; 
     std::string str("{'msg number' : " + std::to_string(i) + "}"); 

     // Create a message builder for this topic 
     MessageBuilder builder("test"); 

     // Construct the configuration 
     Configuration config = 
     { 
      { "metadata.broker.list", "192.168.1.100:9092"} 
     }; 

     // Create the producer 
     Producer producer(config); 

     builder.payload(str); 

     producer.produce(builder); //Only a few messages are received! 

     std::this_thread::sleep_for(std::chrono::milliseconds(50));//If I remove this, no message is received! 
    } 
} 

在我的机器,我已经运行ZookeeperKafka server,我跑一个consumer,以显示接收的消息,通过使用这样的:

bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.100:9092 --topic test 

C++代码产生以下:

sending msg number: 0 
sending msg number: 1 
sending msg number: 2 
sending msg number: 3 
(...) //from 0 to 99...all the messages are sent! 
sending msg number: 98 
sending msg number: 99 

我期待消费者,接收所有这些消息,但我看到的只是几个:

{'msg number' : 40} 
{'msg number' : 58} 
{'msg number' : 70} 
{'msg number' : 75} 
{'msg number' : 91} 
{'msg number' : 96} 

并没有更多的好评。

如果我删除行:

std::this_thread::sleep_for(std::chrono::milliseconds(50)); 

我没有收到任何消息。 为什么我的Kafka服务器没有收到我的所有消息?

+1

难道是某种形式的垃圾邮件防护,即从过快发送消息和服务器只是开沟他们>? – GPPK

+0

我想到了这一点,这就是为什么我放置了睡眠(50毫秒)。无论哪种方式,如果我使用这个,我仍然没有收到所有的消息..卡夫卡是否应该能够接收一切,并排队他们? – waas1919

+1

嗨@GPPK你可以把你的评论作为答案?你是对的,这就是这个问题的答案。 – waas1919

回答

1

原文评论:

这可能是有点垃圾邮件防御,即从过快发送消息和服务器只是抛弃他们

有很多不同的反垃圾邮件技术开发由网络服务提供商提供这是通过发送大量网络交易来阻止某人发送垃圾邮件。我觉得可能是你在做什么

https://en.wikipedia.org/wiki/Anti-spam_techniques