我已经设置了多个主题接收消息的火花卡夫卡消费者在斯卡拉:Kafka的Spark流:如何从Kafka consumer DStream获取主题名称?
val properties = readProperties()
val streamConf = new SparkConf().setMaster("local[*]").setAppName("Kafka-Stream")
val ssc = new StreamingContext(streamConf, Seconds(10))
val kafkaParams = Map("metadata.broker.list" -> properties.getProperty("broker_connection_str"),
"zookeeper.connect" -> properties.getProperty("zookeeper_connection_str"),
"group.id" -> properties.getProperty("group_id"),
"auto.offset.reset" -> properties.getProperty("offset_reset")
)
// Kafka integration with receiver
val msgStream = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](
ssc, kafkaParams, Map(properties.getProperty("topic1") -> 1,
properties.getProperty("topic2") -> 2,
properties.getProperty("topic3") -> 3),
StorageLevel.MEMORY_ONLY_SER).map(_._2)
我需要从每个主题制定相应的动作代码的消息(这将是JSON格式)。
我提到了以下问题,但在它的回答对我没有帮助:
get topic from Kafka message in spark
那么,有没有可以用来获取沿着主题名称接收到DSTREAM任何方法与消息确定应该发生什么行动?
任何帮助,将不胜感激。谢谢。
你用什么版本的'spark'? – semsorock
spark-streaming_2.11 - > 1.6.1,spark-streaming-kafka_2.11 - > 1.6.1“和 kafka_2.11 - > 0.8.2.2 – Arjun