4

我在写一个从卡夫卡主题读取的Spark(v1.6.0)批处理作业。
为此,我可以使用org.apache.spark.streaming.kafka.KafkaUtils#createRDD但是,我需要为所有分区设置偏移量,并且还需要将它们存储在某个位置(ZK?HDFS?),以了解从何处开始下一批作业。阅读火花批处理作业中的卡夫卡主题

什么是从卡夫卡工作读取正确的方法?

我还想着写一个工作代替,从auto.offset.reset=smallest读取并保存检查点 到HDFS,然后在接下来的运行从启动。

但是在这种情况下,我怎样才能获取一次并在第一批之后停止流式传输

+1

最好分两个问题。 – maasg

回答

4

createRDD是从kafka读取批次的正确方法。

要查询有关最新/最早可用偏移量的信息,请查看KafkaCluster.scala方法getLatestLeaderOffsetsgetEarliestLeaderOffsets。该文件是private,但在最新版本的spark中应该是public

+0

感谢您的答案也在火花名单!你的意思是这些方法给出了关于如何在后端使用适当的存储来实现偏移量查找的指导原则? – Bruckwald

+1

不,这些方法用于从卡夫卡获取最新的可用偏移量。查看https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala查找商店中已提交的偏移量的示例。 –

+0

@CodyKoeninger 0.10版本中不存在KafkaCuster类型。我错过了什么吗?我想批量处理主题中的所有现有记录,但无法从Kafka中检索最早和最新的偏移量。 – InfinitiesLoop