我写一个星火结构流应用Pyspark不允许从卡夫卡读取数据。星火流:卡夫卡组ID星火结构化流
但是,Spark的当前版本是2.1.0,它不允许我将group id设置为参数,并会为每个查询生成唯一的id。但卡夫卡连接是基于组的授权,需要预设的组标识。
因此,是否有任何解决方法来建立连接而不需要更新Spark到2.2,因为我的团队不需要它。
我的代码:
if __name__ == "__main__":
spark = SparkSession.builder.appName("DNS").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("WARN")
# Subscribe to 1 topic
lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host:9092").option("subscribe", "record").option('kafka.security.protocol',"SASL_PLAINTEXT").load()
print(lines.isStreaming) #print TRUE
lines.selectExpr("CAST(value AS STRING)")
# Split the lines into words
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
# Generate running word count
wordCounts = words.groupBy("word").count()
# Start running the query that prints the running counts to the console
query = wordCounts \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
我不认为你可以在Spark 2.2中设置'group.id' - http://spark.apache.org/docs/latest/structured-streaming-kafka -integration.html#kafka-specific-configurations – himanshuIIITian
根据此[Databricks doc](https://docs.databricks.com/spark/latest/structured-streaming/kafka.html)_Since Spark 2.2,您可以选择设置组ID。但是,请谨慎使用,因为这可能会导致意外的行为._ – ELI
奇怪!因为根据Spark 2.2文档,我们不能。可能两个文件之间存在不匹配。 – himanshuIIITian