2017-04-07 39 views
1

我正在玩Spark Streaming和Kafka(使用Scala API),并且希望通过Spark Streaming从一组Kafka主题中读取消息。kafka和Spark:通过API获取主题的第一个偏移量

下面的方法:

val kafkaParams = Map("metadata.broker.list" -> configuration.getKafkaBrokersList(), "auto.offset.reset" -> "smallest") 
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) 

读取卡夫卡到最新的偏移,但不给我,我需要(因为我从一组主题阅读的元数据,我需要为每个消息,我读了这个话题),但这种其他方法KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Tuple2[String, String]](ssc, kafkaParams, currentOffsets, messageHandler)明确希望我没有的偏移量。

我知道有这个shell命令给你最后的偏移量。

kafka-run-class.sh kafka.tools.GetOffsetShell 
    --broker-list <broker>: <port> 
    --topic <topic-name> --time -1 --offsets 1 

KafkaCluster.scala是对于曾经是公共开发商,让您正是我想什么的API。

提示?

回答

1

您可以从GetOffsetShell.scala kafka API documentation

val consumer = new SimpleConsumer(leader.host, leader.port, 10000, 100000, clientId) 
val topicAndPartition = TopicAndPartition(topic, partitionId) 
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time, nOffsets))) 
val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets 
使用代码

或者你可以用独特的groupId创造新的消费和使用它获得第一偏移

val consumer=new KafkaConsumer[String, String](createConsumerConfig(config.brokerList)) 
consumer.partitionsFor(config.topic).foreach(pi => { 
     val topicPartition = new TopicPartition(pi.topic(), pi.partition()) 

     consumer.assign(List(topicPartition)) 
     consumer.seekToBeginning() 
     val firstOffset = consumer.position(topicPartition) 
... 
+0

谢谢@Natalia! 在第一个片段..什么是时间?什么是'时间',什么是'nOffset'? – salvob

+0

哦,我明白了,你从[这里]获得了第一个片段(https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/tools/GetOffsetShell.scala) – salvob

+0

@salvob这个问题的答案是否正确?如果是这样,请标记为已回答的问题,否则,如果您能告诉我们您是如何解决问题的,我将不胜感激:) – dbustosp

相关问题