2017-08-09 66 views
0

虽然我在做卡哇卡flink的概念验证,但我发现了以下情况:似乎kafka制作者的错误可能是由于flink端完成的工作量而发生的!kafka-flink:根据flink作业的卡夫卡制作者错误

以下是详细信息:

我有样本文件样?EDR包含“实体”,“价值”,“时间戳”

我用值进行700'000行的〜。下面的命令来创建卡夫卡话题:

~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 1 --replication-factor 1 --topic gprs 

我用下面的命令加载的话题示例文件:

[13:00] [email protected]: ~/fms 
% /home/kafka/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic gprs < ~/sample/sample01.EDR 

我有flink方面的工作,为6小时和72小时的滑动窗口(aggregationeachsix,aggregationeachsentytwo)为每个实体聚合值。

我做了三个方案:在主题

  1. 加载文件,而不会与aggregationeachsix的主题中运行
  2. 加载文件aggregationeachsix作业的主题中运行
  3. 加载文件中的任何工作,aggregationeachsentytwo工作运行

结果是,前两个方案正在工作,但对于第三个方案,我在kafka生产者方面有以下错误在加载文件(在同一个文件并非总是如此,它可以在第一,第二,第三或甚至更高版本的文件):

[plenty of lines before this part] 
    [2017-08-09 12:56:53,409] ERROR Error when sending message to topic gprs with key: null, value: 35 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) 
    org.apache.kafka.common.errors.TimeoutException: Expiring 233 record(s) for gprs-0: 1560 ms has passed since last append 
    [2017-08-09 12:56:53,409] ERROR Error when sending message to topic gprs with key: null, value: 37 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) 
    org.apache.kafka.common.errors.TimeoutException: Expiring 233 record(s) for gprs-0: 1560 ms has passed since last append 
    [2017-08-09 12:56:53,409] ERROR Error when sending message to topic gprs with key: null, value: 37 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) 
    org.apache.kafka.common.errors.TimeoutException: Expiring 233 record(s) for gprs-0: 1560 ms has passed since last append 
    [2017-08-09 12:56:53,412] WARN Got error produce response with correlation id 1627 on topic-partition gprs-0, retrying (2 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) 
    [2017-08-09 12:56:53,412] WARN Got error produce response with correlation id 1626 on topic-partition gprs-0, retrying (2 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) 
    [2017-08-09 12:56:53,412] WARN Got error produce response with correlation id 1625 on topic-partition gprs-0, retrying (2 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) 
    [2017-08-09 12:56:53,412] WARN Got error produce response with correlation id 1624 on topic-partition gprs-0, retrying (2 attempts left). Error: NETWORK_EXCEPTION (org.apache.kafka.clients.producer.internals.Sender) 
    [2017-08-09 12:56:53,515] ERROR Error when sending message to topic gprs with key: null, value: 35 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) 
    org.apache.kafka.common.errors.TimeoutException: Expiring 8 record(s) for gprs-0: 27850 ms has passed since batch creation plus linger time 
    [2017-08-09 12:56:53,515] ERROR Error when sending message to topic gprs with key: null, value: 37 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) 
    org.apache.kafka.common.errors.TimeoutException: Expiring 8 record(s) for gprs-0: 27850 ms has passed since batch creation plus linger time 
    [plenty of lines after this part] 

我的问题是,为什么弗林克可能对卡夫卡生产商产生影响,那么,是什么我是否需要更改以避免此错误?

回答

0

当flink和kafka-producer都在使用它时,看起来您正在使网络饱和,因此您获得TimeoutExceptions

+0

一切运行在同一台服务器上(kafka和flink)... –

+0

然后,您可能正在一台机器上运行kafka的极限。 –