2014-10-01 84 views
4

我想只读火花最新消息使用卡夫卡流,但它也取过去的数据阅读最新的火花卡夫卡流

如何设置auto.offset.reset在KafkaUtil火花

JavaPairReceiverInputDStream<String, String> messages = 
      KafkaUtils.createStream(jssc, args[0], args[1], topicMap); 

如何将conf设置为仅提取当前消息。请举一些例子。

由于提前,也有另一种thread

但不是充分的,请帮助我。提前致谢。

回答

7

您需要使用这种方法从KafkaUtils对象:

def createStream[K, V, U <: Decoder[_], T <: Decoder[_]](
     jssc: JavaStreamingContext, 
     keyTypeClass: Class[K], 
     valueTypeClass: Class[V], 
     keyDecoderClass: Class[U], 
     valueDecoderClass: Class[T], 
     kafkaParams: JMap[String, String], 
     topics: JMap[String, JInt], 
     storageLevel: StorageLevel 
    ) 

根据星火版本,则无法使用Java。有一个bug

如果使用的是星火1.1.0,您需要添加到kafkaParams参数此属性:

“auto.offset.reset”, “最大”

另一个解决方法随机生成一个groupId前缀,但这是蹩脚的。