2017-03-17 82 views
0

我已经设置了多个主题接收消息的火花卡夫卡消费者在斯卡拉:Kafka的Spark流:如何从Kafka consumer DStream获取主题名称?

val properties = readProperties() 
val streamConf = new SparkConf().setMaster("local[*]").setAppName("Kafka-Stream") 
val ssc = new StreamingContext(streamConf, Seconds(10)) 

val kafkaParams = Map("metadata.broker.list" -> properties.getProperty("broker_connection_str"), 
         "zookeeper.connect" -> properties.getProperty("zookeeper_connection_str"), 
         "group.id"    -> properties.getProperty("group_id"), 
         "auto.offset.reset" -> properties.getProperty("offset_reset") 
        ) 

// Kafka integration with receiver 
val msgStream = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](
    ssc, kafkaParams, Map(properties.getProperty("topic1") -> 1, 
         properties.getProperty("topic2") -> 2, 
         properties.getProperty("topic3") -> 3), 
         StorageLevel.MEMORY_ONLY_SER).map(_._2) 

我需要从每个主题制定相应的动作代码的消息(这将是JSON格式)。

我提到了以下问题,但在它的回答对我没有帮助:

get topic from Kafka message in spark

那么,有没有可以用来获取沿着主题名称接收到DSTREAM任何方法与消息确定应该发生什么行动?

任何帮助,将不胜感激。谢谢。

+0

你用什么版本的'spark'? – semsorock

+0

spark-streaming_2.11 - > 1.6.1,spark-streaming-kafka_2.11 - > 1.6.1“和 kafka_2.11 - > 0.8.2.2 – Arjun

回答

0

请参阅下面的代码。

您可以通过foreachRDD,DStream上的地图操作来获取主题名称和消息。

msgStream.foreachRDD(rdd => { 
     val pairRdd = rdd.map(i => (i.topic(), i.value())) 
}) 

下面的代码是我使用的createDirectStream的示例源。

val ssc = new StreamingContext(configLoader.sparkConfig, Seconds(conf.getInt(Conf.KAFKA_PULL_INTERVAL))) 
val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> conf.getString(Conf.KAFKA_BOOTSTRAP_SERVERS), 
    "key.deserializer" -> classOf[StringDeserializer], 
    "value.deserializer" -> classOf[StringDeserializer], 
    "group.id" -> conf.getString(Conf.KAFKA_CONSUMER_GID), 
    "auto.offset.reset" -> conf.getString(Conf.KAFKA_AUTO_OFFSET_RESET), 
    "enable.auto.commit" -> (false: java.lang.Boolean) 
) 
val topics: Array[String] = conf.getString(Conf.KAFKA_TOPICS).split(",") 
val stream = KafkaUtils.createDirectStream[String, String](
    ssc, 
    PreferConsistent, 
    Subscribe[String, String](topics, kafkaParams) 
) 
+0

感谢您的回应,但它抛出错误(i.topic (),i.value()):value主题不是String的成员,那么这些方法的主题()和值()会出现在rdd的元素上吗? – Arjun

+0

对不起,如果使用createStream,知道主题名称,如果你需要主题名称,你必须使用createDirectStream实现它,使用createDirectStream,你可以通过上面的代码获得主题名称 –

+0

好的,在问题陈述中分享的另一个问题中,他使用createDirectStream。但是我在传递参数时遇到了问题,你可以向我展示createDirectStream实现的工作代码,因为它有很多变体吗? – Arjun

相关问题