我正在使用Scala中的Spark来消费和处理卡夫卡消费者应用程序中的消息。有时,处理来自Kafka消息队列的消息比平时花费更多的时间。那时候我需要消费最新的信息,而忽略那些已经由制片人发行但尚未消费的早期信息。Apache Kafka:如何接收来自Kafka的最新消息?
这里是我的消费者代码:
object KafkaSparkConsumer extends MessageProcessor {
def main(args: scala.Array[String]): Unit = {
val properties = readProperties()
val streamConf = new SparkConf().setMaster("local[*]").setAppName("Kafka-Stream")
val ssc = new StreamingContext(streamConf, Seconds(1))
val group_id = Random.alphanumeric.take(4).mkString("dfhSfv")
val kafkaParams = Map("metadata.broker.list" -> properties.getProperty("broker_connection_str"),
"zookeeper.connect" -> properties.getProperty("zookeeper_connection_str"),
"group.id" -> group_id,
"auto.offset.reset" -> properties.getProperty("offset_reset"),
"zookeeper.session.timeout" -> properties.getProperty("zookeeper_timeout"))
val msgStream = KafkaUtils.createStream[scala.Array[Byte], String, DefaultDecoder, StringDecoder](
ssc,
kafkaParams,
Map("moved_object" -> 1),
StorageLevel.MEMORY_ONLY_SER
).map(_._2)
msgStream.foreachRDD { x =>
x.foreach {
msg => println("Message: "+msg)
processMessage(msg)
}
}
ssc.start()
ssc.awaitTermination()
}
}
有没有什么办法,以确保消费者总是能够在消费者应用程序的最新消息?或者我是否需要在卡夫卡配置中设置任何属性以实现相同?
任何帮助,将不胜感激。谢谢
我每次开始执行消费者应用程序时都会生成随机组ID。它以这种方式获取最新消息,但是如果处理需要更多时间,它会继续处理我不需要的旧消息。 – Arjun