我以前成功地在Kafka(0.10.1.0)之前一直使用pyspark进行Spark Streaming(Spark 2.0.2),但我的目的更适合结构化流式传输。我已经尝试使用范例在线:https://spark.apache.org/docs/2.1.0/structured-streaming-kafka-integration.htmlPyspark结构化流式传输卡夫卡配置错误
与以下类似代码:
: org.apache.kafka.common.config.ConfigException:
Missing required configuration "partition.assignment.strategy" which has no default value
我也尝试添加:
ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
query = ds1
.writeStream
.outputMode('append')
.format('console')
.start()
query.awaitTermination()
但是,我始终与下面的错误结束在此创建ds1时的选项集合:
.option("partition.assignment.strategy", "range")
但是ev明确地给它赋值并不能阻止错误,也没有任何其他值(比如“roundrobin”),我可以在线或在Kafka文档中找到它。
我也试过这个“分配”选项,并取得了相同的错误(我们的卡夫卡主机设置为分配 - 每个消费者只分配一个分区,我们没有任何重新平衡)。
任何想法这里发生了什么?文档没有帮助(可能因为它仍处于试验阶段)。另外,有没有使用KafkaUtils进行结构化流式传输?或者这是唯一的网关?