如何从特定日期的卡夫卡群集中获取消息或数据。例如9月13日,任何人都可以为我提供代码。我用Google搜索了一下,发现只有理论,但我想要的代码从Kafka中检索基于时间戳的数据
回答
没有访问方法这一点。此外,在卡夫卡v0.10
消息没有包含任何时间戳信息之前,因此,不可能知道消息何时被写入主题。
从Kafka v0.10
开始,每条消息都包含一个元数据时间戳属性,该属性或者由生产者在消息创建时设置,或者由代理在消息插入时设置。基于时间的索引已计划,但尚未提供。因此,您需要消耗整个主题并检查时间戳字段(并忽略您不感兴趣的所有消息)。为了找到开始,您还可以对偏移和时间戳进行二进制搜索以更快地找到第一条消息。
更新:
Kakfa 0.10.1
增加了一个基于时间的索引。它允许seek
与时间戳等于或大于给定时间戳的第一条记录。您可以通过KafkaConsumer#offsetsForTime()
使用它。这将返回相应的偏移量,您可以将它们送入KafkaConsumer#seek()
。您只需使用数据并通过ConsumerRecord#timestamp()
检查记录时间戳字段以查看何时可以停止处理。
请注意,该数据是由偏移严格排序,但不是由时间戳。因此,在处理过程中,您可能会得到比您的开始时间戳小时间戳的“迟到”记录(尽管您可以简单地跳过这些记录)。
一个比较棘手的问题是在你的搜索间隔结束迟到纪录,但。在获得第一个时间戳并且搜索间隔时间较长的时间戳之后,可能还会有一些带有时间戳记的记录,这些记录稍后将成为搜索间隔的一部分(如果这些记录已附加到主题“late”)。虽然没有办法知道。因此,您可能希望继续阅读“更多”记录并检查是否存在“迟到”记录。 “一些记录”意味着多少,是您自己需要做出的设计决策。
没有一般原则,但 - 如果你对你的“写入模式”额外的知识,它可以帮助定义一个很好的策略,给你后你的搜索间隔“结束”多少条记录要消费。当然有两种默认策略:(1)停留在第一条记录的时间戳比搜索间隔更长(并且有效地忽略任何迟到的记录 - 如果使用“日志追加时间”配置,这当然是安全策略); (2)你读到日志的末尾 - 这是关于完整性的最安全的策略,但可能导致过高的开销(还要注意,随着记录可以随时附加,并且如果记录“延迟”可以是任意大的,迟到的记录甚至可能在你到达日志结束后追加)。
在实践中,这可能是一个好主意,考虑一个“最大预期延迟”和读取,直到你得到一个纪录比束缚,这延迟上更大的时间戳。
- 1. 数据库时间数据检索,基于时间的查询
- 2. 使用Mysql从数据库中检索时间戳数据
- 3. 基于laravel中的created_at时间戳检索模型5
- 4. 如何从HBase的表检索基于时间的数据
- 5. 基于时间戳
- 6. 基于时间戳
- 7. 从sql数据库中删除。基于时间戳(cron job)?
- 8. mysql查询基于不同的键和时间戳检索行
- 9. cassandra基于时间戳列的时间序列数据清除
- 10. 从firebase数据库检索错误的时间戳
- 11. 从元数据检索MySQL行创建的时间戳?
- 12. 从MySQL检索时间戳数据到C++
- 13. 从数据库检索时间戳记录
- 14. 基于当前时间戳从sqlite数据库中获取数据
- 15. 从hbase行检索时间戳
- 16. 结合基于时间戳
- 17. 基于一个时间戳
- 18. mysql基于时间戳计数?
- 19. 基于logstash中已转换的unix时间戳值的索引
- 20. 检索时间戳之间的PostgreSQL
- 21. 检索和基于时间段显示数据
- 22. 基于组合框选择从数据库中检索数据
- 23. 从火花流中的kafka消息中提取时间戳吗?
- 24. 从C#插入后从mongodb中检索不同的时间戳
- 25. 基于时区偏移量的MySql搜索时间戳
- 26. 基于时间的数据
- 27. Hive:如何根据行时间戳检索数据
- 28. kafka从当前时间获取数据
- 29. 在RTP/RTSP中检索时间戳
- 30. 基于时间戳R的快速数据填充
*“作为卡夫卡v0.10每个消息包含元数据的时间戳属性,要么通过消息创建时间生产者设置,或通过邮件插入时间的经纪人。” * 你能解释一下如何时间戳是由制片人定义的?有没有办法让服务器始终提供时间戳? –
'ProducerRecord'对其构造函数有多个重载;有些接受时间戳参数(long类型)。对于经纪商端时间戳,您需要更改相应的主题配置文件“message.timestamp.type”,参见https://kafka.apache.org/documentation/#topicconfigs –