我想用火花流读取来自卡夫卡的旧信息。但是,我只能够在实时发送邮件时检索邮件(即,如果我填充新邮件,而我的Spark程序正在运行 - 则会收到这些邮件)。apache spark streaming - kafka - 阅读旧信息
我正在改变我的groupID和consumerID,以确保zookeeper不只是不给它知道我的程序已经见过的消息。
假设spark在zookeeper中看到偏移量为-1,它不应该读取队列中的所有旧消息吗?我只是误解了卡夫卡队列的使用方式吗?我对火花和卡夫卡非常陌生,所以我不能排除我只是误解了一些东西。
package com.kibblesandbits
import org.apache.spark.SparkContext
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import net.liftweb.json._
object KafkaStreamingTest {
val cfg = new ConfigLoader().load
val zookeeperHost = cfg.zookeeper.host
val zookeeperPort = cfg.zookeeper.port
val zookeeper_kafka_chroot = cfg.zookeeper.kafka_chroot
implicit val formats = DefaultFormats
def parser(json: String): String = {
return json
}
def main(args : Array[String]) {
val zkQuorum = "test-spark02:9092"
val group = "myGroup99"
val topic = Map("testtopic" -> 1)
val sparkContext = new SparkContext("local[3]", "KafkaConsumer1_New")
val ssc = new StreamingContext(sparkContext, Seconds(3))
val json_stream = KafkaUtils.createStream(ssc, zkQuorum, group, topic)
var gp = json_stream.map(_._2).map(parser)
gp.saveAsTextFiles("/tmp/sparkstreaming/mytest", "json")
ssc.start()
}
运行此时,我会看到以下消息。所以我相信它不仅仅是没有看到消息,因为偏移量已经设置好了。
14/12/05 13时34分08秒INFO ConsumerFetcherManager: [ConsumerFetcherManager-1417808045047]增加分区提取器 ArrayBuffer([[testtopic,0],initOffset -1促成 ID:1,主机:test-spark02.vpc,port:9092],[[testtopic,1], initOffset -1 to broker id:1,host:test-spark02.vpc,port:9092], [[testtopic,2], initOffset -1 to broker id:1,host:test-spark02.vpc,port:9092],[[testtopic,3], initOffset -1 to broker id:1,host:test-spark02.vpc,port: 9092], [[testtopic,4],initOffset -1 to broker id:1,host:test-spark02.vpc,port:9092])
然后,如果我填充1000条新消息 - 我可以看到这1000条消息保存在我的临时目录中。但我不知道如何读取现有的消息,这些消息应该在(现在)成千上万的数字中编号。
仍然没有为我工作,是否有任何其他可能的方式来做到这一点?我在主题中有10k条消息,但只有当我收到新消息时才能检索到它们。如何获取已存储在kafka主题中的数据? – 2017-02-02 07:31:20
“auto.offset.reset” - >“最小”为我工作。另外,根据文档,https://cwiki.apache.org/confluence/display/KAFKA/FAQ,如果您使用0.9版本,它应该是“最早的” – Evgenii 2017-11-29 10:35:56