1
我正在玩Spark Streaming和Kafka(使用Scala API),并且希望通过Spark Streaming从一组Kafka主题中读取消息。kafka和Spark:通过API获取主题的第一个偏移量
下面的方法:
val kafkaParams = Map("metadata.broker.list" -> configuration.getKafkaBrokersList(), "auto.offset.reset" -> "smallest")
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
读取卡夫卡到最新的偏移,但不给我,我需要(因为我从一组主题阅读的元数据,我需要为每个消息,我读了这个话题),但这种其他方法KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Tuple2[String, String]](ssc, kafkaParams, currentOffsets, messageHandler)
明确希望我没有的偏移量。
我知道有这个shell命令给你最后的偏移量。
kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list <broker>: <port>
--topic <topic-name> --time -1 --offsets 1
和KafkaCluster.scala
是对于曾经是公共开发商,让您正是我想什么的API。
提示?
谢谢@Natalia! 在第一个片段..什么是时间?什么是'时间',什么是'nOffset'? – salvob
哦,我明白了,你从[这里]获得了第一个片段(https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/tools/GetOffsetShell.scala) – salvob
@salvob这个问题的答案是否正确?如果是这样,请标记为已回答的问题,否则,如果您能告诉我们您是如何解决问题的,我将不胜感激:) – dbustosp