1

我正在使用Scala中的Spark来消费和处理卡夫卡消费者应用程序中的消息。有时,处理来自Kafka消息队列的消息比平时花费更多的时间。那时候我需要消费最新的信息,而忽略那些已经由制片人发行但尚未消费的早期信息。Apache Kafka:如何接收来自Kafka的最新消息?

这里是我的消费者代码:

object KafkaSparkConsumer extends MessageProcessor { 

def main(args: scala.Array[String]): Unit = { 
    val properties = readProperties() 

    val streamConf = new SparkConf().setMaster("local[*]").setAppName("Kafka-Stream") 
    val ssc = new StreamingContext(streamConf, Seconds(1)) 

    val group_id = Random.alphanumeric.take(4).mkString("dfhSfv") 
    val kafkaParams = Map("metadata.broker.list"   -> properties.getProperty("broker_connection_str"), 
         "zookeeper.connect"    -> properties.getProperty("zookeeper_connection_str"), 
         "group.id"      -> group_id, 
         "auto.offset.reset"    -> properties.getProperty("offset_reset"), 
         "zookeeper.session.timeout"  -> properties.getProperty("zookeeper_timeout")) 

    val msgStream = KafkaUtils.createStream[scala.Array[Byte], String, DefaultDecoder, StringDecoder](
         ssc, 
         kafkaParams, 
         Map("moved_object" -> 1), 
         StorageLevel.MEMORY_ONLY_SER 
        ).map(_._2) 

    msgStream.foreachRDD { x => 
    x.foreach { 
     msg => println("Message: "+msg) 
     processMessage(msg) 
    }  
    }       
    ssc.start() 
    ssc.awaitTermination() 
    } 
} 

有没有什么办法,以确保消费者总是能够在消费者应用程序的最新消息?或者我是否需要在卡夫卡配置中设置任何属性以实现相同?

任何帮助,将不胜感激。谢谢

回答

0

连接到Kafka时,您可以随时生成一个新的(随机)组标识 - 这样您在连接时就会开始消费新的消息。

+0

我每次开始执行消费者应用程序时都会生成随机组ID。它以这种方式获取最新消息,但是如果处理需要更多时间,它会继续处理我不需要的旧消息。 – Arjun

2

卡夫卡消费者API包括方法

void seekToEnd(Collection<TopicPartition> partitions) 

从消费所以,你可以得到分配的分区,并寻求他们全部结束。有类似的方法seekToBeginning。

+0

我已经在问题描述中发布了我的代码。你可以看看它,并建议我在哪里添加此方法?谢谢你的回答:) – Arjun

+0

你对auto.offset.reset有什么价值? – Natalia

+0

它已被设置为'最大',自动将该值重置为最大偏移量。 – Arjun

0

您可以享受到两个KafkaConsumer API从分区得到的最后消息(假设日志压缩将不再是一个问题):

  1. public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions):这给了你到底给定分区的偏移。请注意,结束偏移量是下一个要发送的消息的偏移量。
  2. public void seek(TopicPartition partition, long offset):对每个分区运行此操作,并提供从上面调用减去1(假设它大于0)的结束偏移量。
+0

我已经在问题描述中发布了我的代码。你可以看看它,并建议我在哪里添加此方法?谢谢你的回答:) – Arjun

0

是的,您可以将staringOffset设置为最近使用最新消息。

val spark = SparkSession 
    .builder 
    .appName("kafka-reading") 
    .getOrCreate() 

import spark.implicits._ 
val df = spark 
     .readStream 
     .format("kafka") 
     .option("kafka.bootstrap.servers", "localhost:9092") 
     .option("startingOffsets", "latest") 
     .option("subscribe", topicName) 
     .load() 
+0

我已经在问题描述中发布了我的代码。你可以看看它,并建议我在哪里添加你的代码?谢谢你回答:) – Arjun

+0

当你定义kafkaParams时,你需要添加这个属性。 consumer.forcefromstart = false有关更多信息,您可以看到消费者属性。 https://github.com/dibbhatt/kafka-spark-consumer –

+0

好的Mahesh。我需要几天时间才能查看。我会尽快给您回复。再次感谢你。 – Arjun