2017-02-28 35 views
1

我正在尝试从JMS源读取数据,并将它们推入到KAFKA主题中,同时这么做几个小时后,我观察到将频率推向KAFKA主题变成几乎为零,经过一些初步分析,我在FLUME日志中发现了以下异常。Flume卡夫卡水槽中的org.apache.kafka.common.errors.RecordTooLargeException

28 Feb 2017 16:35:44,758 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:158) - Unable to deliver event. Exception follows. 
org.apache.flume.EventDeliveryException: Failed to publish events 
     at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252) 
     at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67) 
     at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145) 
     at java.lang.Thread.run(Thread.java:745) 
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1399305 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. 
     at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:686) 
     at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449) 
     at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:212) 
     ... 3 more 
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1399305 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. 

我水槽显示max.request.size当前设定值(在日志)为1048576,这显然要比1399305非常少的,增加该max.request.size可以消除这些例外,但我无法找到更新该值的正确位置。

我flume.config,

a1.sources = r1 
a1.channels = c1 
a1.sinks = k1 

a1.channels.c1.type = file 
a1.channels.c1.transactionCapacity = 1000 
a1.channels.c1.capacity = 100000000 
a1.channels.c1.checkpointDir = /data/flume/apache-flume-1.7.0-bin/checkpoint 
a1.channels.c1.dataDirs = /data/flume/apache-flume-1.7.0-bin/data 

a1.sources.r1.type = jms 

a1.sources.r1.interceptors.i1.type = timestamp 
a1.sources.r1.interceptors.i1.preserveExisting = true 

a1.sources.r1.channels = c1 
a1.sources.r1.initialContextFactory = some context urls 
a1.sources.r1.connectionFactory = some_queue 
a1.sources.r1.providerURL = some_url 
#a1.sources.r1.providerURL = some_url 
a1.sources.r1.destinationType = QUEUE 
a1.sources.r1.destinationName = some_queue_name 
a1.sources.r1.userName = some_user 
a1.sources.r1.passwordFile= passwd 

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink 
a1.sinks.k1.kafka.topic = some_kafka_topic 
a1.sinks.k1.kafka.bootstrap.servers = some_URL 
a1.sinks.k1.kafka.producer.acks = 1 
a1.sinks.k1.flumeBatchSize = 1 
a1.sinks.k1.channel = c1 

任何帮助将非常感激!

回答

0

这个改变必须在卡夫卡完成。 更新卡夫卡生产者配置文件producer.properties与像

max.request.size=10000000 
+0

我使用FLUME它使用KAFKA的生产者库来推送主题上的消息,但我不能看到它作为可配置iin FLUME;我是否需要将任何硬编码值更改为生产者类? –

+0

@RiteshSharma你是说你没有在服务器上安装Kafka? – franklinsijo

+0

其实这个“max.request.size”问题是FLUME的问题,我正在使用kafka接收器推送kafka经纪人的数据,所以基本上FLUME使用kafka生产者库(kafka接收器)在kafka经纪人上推送数据; FLUME不提供任何专用配置文件作为“producer.properties”,您只需在FLUME配置中更新kafka生产者属性。 –

1

好像我已经解决了我的问题一个较大的值;由于怀疑增加了max.request.size消除了例外情况,为了更新此类kafka汇(生产者)属性,FLUME提供了不变的前缀,如“kafka.producer”。,我们可以附加这个不变的前缀与任何kafka属性;

so mine goes as,a1.sinks.k1.kafka.producer.max.request.size = 5271988

+0

哇。从来不知道这是可能的! – franklinsijo