0

我写一个星火结构流应用Pyspark不允许从卡夫卡读取数据。星火流:卡夫卡组ID星火结构化流

但是,Spark的当前版本是2.1.0,它不允许我将group id设置为参数,并会为每个查询生成唯一的id。但卡夫卡连接是基于组的授权,需要预设的组标识。

因此,是否有任何解决方法来建立连接而不需要更新Spark到2.2,因为我的团队不需要它。

我的代码:

if __name__ == "__main__": 
    spark = SparkSession.builder.appName("DNS").getOrCreate() 
    sc = spark.sparkContext 
    sc.setLogLevel("WARN") 

    # Subscribe to 1 topic 
    lines = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host:9092").option("subscribe", "record").option('kafka.security.protocol',"SASL_PLAINTEXT").load() 
    print(lines.isStreaming) #print TRUE 
    lines.selectExpr("CAST(value AS STRING)") 
    # Split the lines into words 
    words = lines.select(
    explode(
     split(lines.value, " ") 
     ).alias("word") 
    ) 
    # Generate running word count 
    wordCounts = words.groupBy("word").count() 

    # Start running the query that prints the running counts to the console 
    query = wordCounts \ 
     .writeStream \ 
     .outputMode("complete") \ 
     .format("console") \ 
     .start() 

    query.awaitTermination() 
+0

我不认为你可以在Spark 2.2中设置'group.id' - http://spark.apache.org/docs/latest/structured-streaming-kafka -integration.html#kafka-specific-configurations – himanshuIIITian

+0

根据此[Databricks doc](https://docs.databricks.com/spark/latest/structured-streaming/kafka.html)_Since Spark 2.2,您可以选择设置组ID。但是,请谨慎使用,因为这可能会导致意外的行为._ – ELI

+0

奇怪!因为根据Spark 2.2文档,我们不能。可能两个文件之间存在不匹配。 – himanshuIIITian

回答

0

KafkaUtils类将覆盖参数值"group.id"。它将从原始组ID中接收"spark-executor-"

下面是KafkaUtils其中这样的代码:

// driver and executor should be in different consumer groups 
    val originalGroupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG) 
    if (null == originalGroupId) { 
     logError(s"${ConsumerConfig.GROUP_ID_CONFIG} is null, you should probably set it") 
    } 
    val groupId = "spark-executor-" + originalGroupId 
    logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to ${groupId}") 
    kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId) 

我们面临着同样的问题。 Kafka基于带预设组ID的ACL,因此唯一的办法是在kafka配置中更改组ID。我们的原始团队ID的insead我们把"spark-executor-" + originalGroupId

+0

我正在使用Spark结构化流(上面的代码),它直接从kafka读取流数据而不创建流上下文 – ELI

相关问题