我正在尝试从JMS源读取数据,并将它们推入到KAFKA主题中,同时这么做几个小时后,我观察到将频率推向KAFKA主题变成几乎为零,经过一些初步分析,我在FLUME日志中发现了以下异常。Flume卡夫卡水槽中的org.apache.kafka.common.errors.RecordTooLargeException
28 Feb 2017 16:35:44,758 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:158) - Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to publish events
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:252)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1399305 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:686)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449)
at org.apache.flume.sink.kafka.KafkaSink.process(KafkaSink.java:212)
... 3 more
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1399305 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.
我水槽显示max.request.size当前设定值(在日志)为1048576,这显然要比1399305非常少的,增加该max.request.size可以消除这些例外,但我无法找到更新该值的正确位置。
我flume.config,
a1.sources = r1
a1.channels = c1
a1.sinks = k1
a1.channels.c1.type = file
a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.capacity = 100000000
a1.channels.c1.checkpointDir = /data/flume/apache-flume-1.7.0-bin/checkpoint
a1.channels.c1.dataDirs = /data/flume/apache-flume-1.7.0-bin/data
a1.sources.r1.type = jms
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i1.preserveExisting = true
a1.sources.r1.channels = c1
a1.sources.r1.initialContextFactory = some context urls
a1.sources.r1.connectionFactory = some_queue
a1.sources.r1.providerURL = some_url
#a1.sources.r1.providerURL = some_url
a1.sources.r1.destinationType = QUEUE
a1.sources.r1.destinationName = some_queue_name
a1.sources.r1.userName = some_user
a1.sources.r1.passwordFile= passwd
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = some_kafka_topic
a1.sinks.k1.kafka.bootstrap.servers = some_URL
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.flumeBatchSize = 1
a1.sinks.k1.channel = c1
任何帮助将非常感激!
我使用FLUME它使用KAFKA的生产者库来推送主题上的消息,但我不能看到它作为可配置iin FLUME;我是否需要将任何硬编码值更改为生产者类? –
@RiteshSharma你是说你没有在服务器上安装Kafka? – franklinsijo
其实这个“max.request.size”问题是FLUME的问题,我正在使用kafka接收器推送kafka经纪人的数据,所以基本上FLUME使用kafka生产者库(kafka接收器)在kafka经纪人上推送数据; FLUME不提供任何专用配置文件作为“producer.properties”,您只需在FLUME配置中更新kafka生产者属性。 –