2016-08-23 108 views
0

我运行Cloudera公司提供的卡夫卡0.9.0版本,我已经写了一个自定义的生产者Java的生产者不承认财产“Partitioner.class”属性

我加入以下属性来我ProducerConfig:

 Properties props = new Properties(); 
     props.put("bootstrap.servers", brokers); 
     props.put("acks", "all"); 
     //props.put("metadata.broker.list", this.getBrokers()); 
     //props.put("serializer.class", "kafka.serializer.StringEncoder"); 
     props.put("retries", 0); 
     props.put("batch.size", 16384); 
     props.put("linger.ms", 1); 
     props.put("buffer.memory", 33554432); 
     props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     props.put("partitioner.class", "com.vikas.MyPartitioner"); 

然而,当我运行我的程序,我对http://kafka.apache.org/090/documentation.html#producerconfigs

收到此错误

16/08/23 18:00:33 INFO producer.ProducerConfig: ProducerConfig values: 
     value.serializer = class org.apache.kafka.common.serialization.StringSerializer 
     key.serializer = class org.apache.kafka.common.serialization.StringSerializer 
     block.on.buffer.full = true 
     retry.backoff.ms = 100 
     buffer.memory = 33554432 
     batch.size = 16384 
     metrics.sample.window.ms = 30000 
     metadata.max.age.ms = 300000 
     receive.buffer.bytes = 32768 
     timeout.ms = 30000 
     max.in.flight.requests.per.connection = 5 
     bootstrap.servers = [server01:9092, server02:9092] 
     metric.reporters = [] 
     client.id = 
     compression.type = none 
     retries = 0 
     max.request.size = 1048576 
     send.buffer.bytes = 131072 
     acks = all 
     reconnect.backoff.ms = 10 
     linger.ms = 1 
     metrics.num.samples = 2 
     metadata.fetch.timeout.ms = 60000 

16/08/23 18:00:33 WARN producer.ProducerConfig: The configuration partitioner.class = null was supplied but isn't a known config. 

按文档

partitioner,class是一个有效的属性,但我不知道为什么kafka抱怨它是一个未知的配置。

+0

尝试把它当作一个类值,而不是字符串:'props.put(“分区。类“,MyPartitioner.class)' – serejja

+0

@serejja,错误说它不是一个已知的配置。 –

+0

@serejja,我尝试了你所建议的改变,但我仍然得到相同的错误 –

回答

0

看看这篇文章http://www.javaworld.com/article/3066873/big-data/big-data-messaging-with-kafka-part-2.html它有如何使用自定义分区的示例。

正如你可以看到该错误消息是

“16/08/23 18点○○分33秒WARN producer.ProducerConfig:配置partitioner.class = NULL供给,但不是已知的配置”。这意味着卡夫卡不理解的关键partitioner.class

您可能要设置分区这样

 configProperties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.vikas.MyPartitioner"); 
+0

根据http://kafka.apache.org/090/documentation.html#producerconfigs 的文档partitioner.class是一个有效的属性,但我不知道为什么kafka抱怨它是一个未知的配置。我会测试你今天提供的解决方案。 –

+0

对不起@SunilPatil它不起作用:( –

+0

试试这个代码文章http://www.javaworld.com/article/3066873/big-data/big-data-messaging-with-kafka-part-2.html。它有自定义分区程序部分,示例代码位于https://github.com/sdpatil/KafkaAPIClient/tree/master/src/main/java/com/spnotes/kafka/partition中。尝试com.spnotes.kafka.partition .Producer类使用自定义分区程序 –