试图从kafka源读取。我想从收到的消息中提取时间戳,以执行结构化的火花流式传输。 卡夫卡(版本0.10.0.0) 火花流(2.0.1版本)从火花流中的kafka消息中提取时间戳吗?
2
A
回答
0
我建议两件事情:
假设你创建流通过最新Kafka Streaming Api (0.10 Kafka)
例如你使用依赖:
"org.apache.spark" %% "spark-streaming-kafka-0-10" % 2.0.1
比创建了一个流,根据上述文档:
val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "broker1:9092,broker2:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[ByteArrayDeserializer], "group.id" -> "spark-streaming-test", "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean)) val sparkConf = new SparkConf() // suppose you have 60 second window val ssc = new StreamingContext(sparkConf, Seconds(60)) ssc.checkpoint("checkpoint") val stream = KafkaUtils.createDirectStream(ssc, PreferConsistent, Subscribe[String, Array[Byte]](topics, kafkaParams))
你流将是ConsumerRecord[String,Array[Byte]]的DSTREAM,你可以得到一个时间戳和键值就这么简单:
stream.map { record => (record.timestamp(), record.key(), record.value()) }
希望有所帮助。
0
spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "your.server.com:9092")
.option("subscribe", "your-topic")
.load()
.select($"timestamp", $"value")
字段“时间戳”是你在找什么。类型 - java.sql.Timestamp。确保你连接到0.10 Kafka服务器。在早期版本中没有时间戳。 这里描述的字段的完整列表 - http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#creating-a-kafka-source-for-batch-queries
相关问题
- 1. 火花流功能通过异常火花提交时间
- 2. 要求来自Kafka的两个时间戳之间的消息
- 3. 火花流多输入kafka dstreams混淆
- 4. kafka火花流java api问题
- 5. Boto3从Kinesis流中读取时间戳
- 6. 从消息到达时间戳提取周期
- 7. 从具有时间戳的实木复合地板表中读取的火花
- 8. 火花sql当前时间戳功能
- 9. 从火花提取值RDD
- 10. 从火花管道物流模型中提取变量权重?
- 11. Android中的Alljoyn消息时间戳
- 12. 从kafka消费消息时的异常
- 13. 提取日志RegEX中2个时间戳模式之间的消息
- 14. Flink + Kafka 0.10:如何使用Kafka消息时间戳创建表作为字段?
- 15. MQTT消息时间戳
- 16. 从kafka到hdfs通过火花
- 17. 无法通过火花流消耗卡夫卡消息
- 18. 在Apache Kafka中截取的消息
- 19. 如何从firebase提取聊天消息数据那些时间戳更大的保存时间戳?
- 20. kafka通过时间戳获取记录,消费者循环
- 21. 提取时间戳
- 22. 我可以在火花流中的窗口中获取每个时间间隔的数据吗?
- 23. 火花:火花CSV时间过长
- 24. 如何从Smack 4.1中的实时消息获取服务器时间戳
- 25. 如何从Hubot脚本获取Slack消息的ID /时间戳?
- 26. kafka中的消息路由
- 27. 从时间戳记提取SQL小时
- 28. RabbitMQ收到消息时的时间戳
- 29. 当我使用火花流将消息写入kafka主题时,它只写入一个分区
- 30. 火花时间差
你能显示你当前的代码片段吗? – vanekjar
@vanekjar VAL DS1 =火花 .readStream .format( “卡夫卡”) 。选项( “kafka.bootstrap.servers”, “本地主机:9092”) 。选项( “订阅”, “topicA”) 。加载() – shivali