2015-05-20 78 views
5

如何从kafka中的消息中识别主题名称。从kafka信息获取主题

String[] topics = { "test", "test1", "test2" }; 
    for (String t : topics) { 
     topicMap.put(t, new Integer(3)); 
    } 

SparkConf conf = new SparkConf().setAppName("KafkaReceiver") 
      .set("spark.streaming.receiver.writeAheadLog.enable", "false") 
      .setMaster("local[4]") 
      .set("spark.cassandra.connection.host", "localhost"); 
    ; 
    final JavaSparkContext sc = new JavaSparkContext(conf); 
    JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(
      1000)); 

    /* Receive Kafka streaming inputs */ 
    JavaPairReceiverInputDStream<String, String> messages = KafkaUtils 
      .createStream(jssc, "localhost:2181", "test-group", 
        topicMap); 

    JavaDStream<MessageAndMetadata> data = 
      messages.map(new Function<Tuple2<String, String>, MessageAndMetadata>() 
      { 

       public MessageAndMetadata call(Tuple2<String, String> message) 
       { 
        System.out.println("message ="+message._2); 
        return null; 
       } 
      } 

     ); 

我可以从kafka制作人处取得讯息。但是,由于消费者现在消费了三个主题,因此需要确定主题名称。

+0

我对这个答案很感兴趣。你找到方法了吗? –

+0

@阿伦:你找到解决方案吗?如果是这样,你能分享它吗?谢谢! – jithinpt

回答

0

不幸的是,这并不简单,因为Spark的源代码中的KafkaReceiver和ReliableKafkaReceiver只存储MessageAndMetadata.key和消息。

有两个Spark中的JIRA与这个问题有关开放门票:已经打开了,而

脏的复制/粘贴/修改的斯巴克的源代码,以解决您的问题:

package org.apache.spark.streaming.kafka 

import java.lang.{Integer => JInt} 
import java.util.{Map => JMap, Properties} 

import kafka.consumer.{KafkaStream, Consumer, ConsumerConfig, ConsumerConnector} 
import kafka.serializer.{Decoder, StringDecoder} 
import kafka.utils.VerifiableProperties 
import org.apache.spark.Logging 
import org.apache.spark.storage.StorageLevel 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} 
import org.apache.spark.streaming.dstream.ReceiverInputDStream 
import org.apache.spark.streaming.receiver.Receiver 
import org.apache.spark.streaming.util.WriteAheadLogUtils 
import org.apache.spark.util.ThreadUtils 
import scala.collection.JavaConverters._ 
import scala.collection.Map 
import scala.reflect._ 

object MoreKafkaUtils { 

    def createStream(
    jssc: JavaStreamingContext, 
    zkQuorum: String, 
    groupId: String, 
    topics: JMap[String, JInt], 
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 
): JavaReceiverInputDStream[(String, String, String)] = { 
    val kafkaParams = Map[String, String](
     "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, 
     "zookeeper.connection.timeout.ms" -> "10000") 
    val walEnabled = WriteAheadLogUtils.enableReceiverLog(jssc.ssc.conf) 
    new KafkaInputDStreamWithTopic[String, String, StringDecoder, StringDecoder](jssc.ssc, kafkaParams, topics.asScala.mapValues(_.intValue()), walEnabled, storageLevel) 
    } 

} 

private[streaming] 
class KafkaInputDStreamWithTopic[ 
    K: ClassTag, 
    V: ClassTag, 
    U <: Decoder[_] : ClassTag, 
    T <: Decoder[_] : ClassTag](
    @transient ssc_ : StreamingContext, 
    kafkaParams: Map[String, String], 
    topics: Map[String, Int], 
    useReliableReceiver: Boolean, 
    storageLevel: StorageLevel 
) extends ReceiverInputDStream[(K, V, String)](ssc_) with Logging { 

    def getReceiver(): Receiver[(K, V, String)] = { 
    if (!useReliableReceiver) { 
     new KafkaReceiverWithTopic[K, V, U, T](kafkaParams, topics, storageLevel) 
    } else { 
     new ReliableKafkaReceiverWithTopic[K, V, U, T](kafkaParams, topics, storageLevel) 
    } 
    } 
} 

private[streaming] 
class KafkaReceiverWithTopic[ 
    K: ClassTag, 
    V: ClassTag, 
    U <: Decoder[_] : ClassTag, 
    T <: Decoder[_] : ClassTag](
    kafkaParams: Map[String, String], 
    topics: Map[String, Int], 
    storageLevel: StorageLevel 
) extends Receiver[(K, V, String)](storageLevel) with Logging { 

    // Connection to Kafka 
    var consumerConnector: ConsumerConnector = null 

    def onStop() { 
    if (consumerConnector != null) { 
     consumerConnector.shutdown() 
     consumerConnector = null 
    } 
    } 

    def onStart() { 

    logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id")) 

    // Kafka connection properties 
    val props = new Properties() 
    kafkaParams.foreach(param => props.put(param._1, param._2)) 

    val zkConnect = kafkaParams("zookeeper.connect") 
    // Create the connection to the cluster 
    logInfo("Connecting to Zookeeper: " + zkConnect) 
    val consumerConfig = new ConsumerConfig(props) 
    consumerConnector = Consumer.create(consumerConfig) 
    logInfo("Connected to " + zkConnect) 

    val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) 
     .newInstance(consumerConfig.props) 
     .asInstanceOf[Decoder[K]] 
    val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) 
     .newInstance(consumerConfig.props) 
     .asInstanceOf[Decoder[V]] 

    // Create threads for each topic/message Stream we are listening 
    val topicMessageStreams = consumerConnector.createMessageStreams(
     topics, keyDecoder, valueDecoder) 

    val executorPool = 
     ThreadUtils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler") 
    try { 
     // Start the messages handler for each partition 
     topicMessageStreams.values.foreach { streams => 
     streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } 
     } 
    } finally { 
     executorPool.shutdown() // Just causes threads to terminate after work is done 
    } 
    } 

    // Handles Kafka messages 
    private class MessageHandler(stream: KafkaStream[K, V]) 
    extends Runnable { 
    def run() { 
     logInfo("Starting MessageHandler.") 
     try { 
     val streamIterator = stream.iterator() 
     while (streamIterator.hasNext()) { 
      val msgAndMetadata = streamIterator.next() 
      store((msgAndMetadata.key, msgAndMetadata.message, msgAndMetadata.topic)) 
     } 
     } catch { 
     case e: Throwable => reportError("Error handling message; exiting", e) 
     } 
    } 
    } 

} 
+0

您也可以尝试使用带有messageHandler的实验性KafkaUtils.createDirectStream:JFunction [MessageAndMetadata [K,V],R]作为参数。 –

1

火花1.5.0,official documentation鼓励使用从最近发布的开始没有接收器/直接的方法,其中有从最近的1.5.0实验毕业。 这个新的Direct API允许您轻松地获取消息及其元数据,而不是其他好东西。

+0

我正在使用直接方法,并不理解如何获取消息元数据。你能详细说明吗? –

+0

@BrandonBradley,请按照上面的链接查看官方文档中的最后一个代码片段。基本上,你一旦得到它就必须将RDD投射到HasOffsetRanges。 –