2017-08-28 167 views
2

我正在写一个Spark(v2.2)批处理作业,它从Kafka主题读取。 Spark作业使用cron进行调度。 我不能使用Spark Structured Streaming,因为不支持非基础时间窗口。使用Apache Spark批处理Apache Kafka的偏移管理

val df = spark 
     .read 
     .format("kafka") 
     .option("kafka.bootstrap.servers", "...") 
     .option("subscribe", s"kafka_topic") 

我需要设置的偏移量卡夫卡话题从哪里开始下一个批处理作业就知道了。我怎样才能做到这一点?

回答

1

我想你正在使用KafkaUtils来创建流,你可以传递这个参数。

val inputDStream = KafkaUtils.createDirectStream[String,String](ssc,PreferConsistent, 
          Assign[String, String](fromOffsets.keys,kafkaParams,fromOffsets)) 

希望这有助于!

+0

反对 接受 对于批量查询,不允许使用最新的偏移量。 https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html 我必须传递给Spark Streaming才能从最后一次查询停止的位置恢复新的查询。 – ngi

相关问题