2016-11-14 168 views
2

试图从kafka源读取。我想从收到的消息中提取时间戳,以执行结构化的火花流式传输。 卡夫卡(版本0.10.0.0) 火花流(2.0.1版本)从火花流中的kafka消息中提取时间戳吗?

+0

你能显示你当前的代码片段吗? – vanekjar

+0

@vanekjar VAL DS1 =火花 .readStream .format( “卡夫卡”) 。选项( “kafka.bootstrap.servers”, “本地主机:9092”) 。选项( “订阅”, “topicA”) 。加载() – shivali

回答

0

我建议两件事情:

  1. 假设你创建流通过最新Kafka Streaming Api (0.10 Kafka)

    例如你使用依赖:"org.apache.spark" %% "spark-streaming-kafka-0-10" % 2.0.1

    比创建了一个流,根据上述文档:

    val kafkaParams = Map[String, Object](
        "bootstrap.servers" -> "broker1:9092,broker2:9092", 
        "key.deserializer" -> classOf[StringDeserializer], 
        "value.deserializer" -> classOf[ByteArrayDeserializer], 
        "group.id" -> "spark-streaming-test", 
        "auto.offset.reset" -> "latest", 
        "enable.auto.commit" -> (false: java.lang.Boolean)) 
    
    val sparkConf = new SparkConf() 
    // suppose you have 60 second window 
    val ssc = new StreamingContext(sparkConf, Seconds(60)) 
    ssc.checkpoint("checkpoint") 
    
    val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent, 
    Subscribe[String, Array[Byte]](topics, kafkaParams)) 
    
  2. 你流将是ConsumerRecord[String,Array[Byte]]的DSTREAM,你可以得到一个时间戳和键值就这么简单:

    stream.map { record => (record.timestamp(), record.key(), record.value()) } 
    

希望有所帮助。

0
spark.read 
    .format("kafka") 
    .option("kafka.bootstrap.servers", "your.server.com:9092") 
    .option("subscribe", "your-topic") 
    .load() 
    .select($"timestamp", $"value") 

字段“时间戳”是你在找什么。类型 - java.sql.Timestamp。确保你连接到0.10 Kafka服务器。在早期版本中没有时间戳。 这里描述的字段的完整列表 - http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries