0

我以前成功地在Kafka(0.10.1.0)之前一直使用pyspark进行Spark Streaming(Spark 2.0.2),但我的目的更适合结构化流式传输。我已经尝试使用范例在线:https://spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.htmlPyspark结构化流式传输卡夫卡配置错误

与以下类似代码:

: org.apache.kafka.common.config.ConfigException: 
Missing required configuration "partition.assignment.strategy" which has no default value 

我也尝试添加:

ds1 = spark 
    .readStream 
    .format("kafka") 
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2") 
    .option("subscribe", "topic1") 
    .load() 
query = ds1 
    .writeStream 
    .outputMode('append') 
    .format('console') 
    .start() 
query.awaitTermination() 

但是,我始终与下面的错误结束在此创建ds1时的选项集合:

.option("partition.assignment.strategy", "range") 

但是ev明确地给它赋值并不能阻止错误,也没有任何其他值(比如“roundrobin”),我可以在线或在Kafka文档中找到它。

我也试过这个“分配”选项,并取得了相同的错误(我们的卡夫卡主机设置为分配 - 每个消费者只分配一个分区,我们没有任何重新平衡)。

任何想法这里发生了什么?文档没有帮助(可能因为它仍处于试验阶段)。另外,有没有使用KafkaUtils进行结构化流式传输?或者这是唯一的网关?

回答

1
  1. 有卡夫卡0.10.1一个已知的问题。*的客户,你不应该用星火使用它,因为它可能会产生由于https://issues.apache.org/jira/browse/KAFKA-4547错误的答案。您可以使用0.10.0.1客户端,并且它应该可以与0.10.1。* Kafka集群一起使用。

  2. 要将卡夫卡配置发送到结构化数据流中的卡夫卡客户端,您需要添加kafka.前缀,如.option("kafka.partition.assignment.strategy", "range")。但是,您不需要设置kafka.partition.assignment.strategy,因为它具有默认值。我的直觉是你可能把两个Kafka 0.8。*和0.10。* jar放在classpath中,并加载错误的类。

  3. 您想使用KafkaUtils中的哪个API,但在结构化数据流中缺失? Spark 2.2.0刚刚出炉,您可以在结构化流式处理中使用批处理或流式查询与Kafka。例如,请阅读http://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html

0

kafka-clients-*.jar添加到您的spark jar文件夹,然后重新启动spark master和slave。那么你不需要添加.option("partition.assignment.strategy", "range")