kafka-consumer-api

    0热度

    2回答

    我在我的框架中使用了卡夫卡生产者 - 消费者模型。在消费者端消费的记录稍后被索引到elasticsearch上。在这里,我有一个使用案例,如果ES停止运行,我将不得不暂停卡夫卡消费,直到ES启动。一旦启动,我需要恢复使用者并从我最后离开的位置消耗记录。 我不认为这可以通过@KafkaListener来实现。任何人都可以请给我一个这样的解决方案?我发现我需要为此编写自己的KafkaListenerC

    1热度

    1回答

    我正在使用Kafka 0.10.2.1群集。我正在使用Kafka的offsetForTimes API来查找特定的偏移量,并希望在达到结束时间戳时跳出循环。 我的代码是这样的: //package kafka.ex.test; import java.util.*; import org.apache.kafka.clients.consumer.KafkaConsumer; imp

    1热度

    1回答

    我正在建立一个卡夫卡消费者。我已经设置了类似于下面的恢复回调。我已启用手动提交。我如何在恢复回调方法中确认消息,以免发生延迟。 @Bean public ConcurrentKafkaListenerContainerFactory<String, Map<String, Object>> kafkaListenerContainerFactory() { Concurren

    0热度

    1回答

    我目前正在使用R包kafka - rkafka。每当我从卡夫卡读取时,我都会收到“信息:记得在完成阅读信息后关闭消费者”。我如何避免这种情况? >library(rkafka) >consumer<-rkafka.createConsumer("1.2.3.4:2181","Real-time-data", consumerTimeoutMs = "-1") >rkafka.read(cons

    1热度

    1回答

    上述应从单个主题消费的卡夫卡消费者。因为我整合卡夫卡消费者API与弹簧芯web应用程序我不能使用弹簧启动.. Spring的XML配置如下 <bean id="kafkaConsumerProperties" class="com.azuga.kafka.listeners.KafkaConsumerProperties"> <constructor-arg type="java.lan

    1热度

    1回答

    我正在使用Spring Kafka使用者读取关于Kafka主题的消息。我坚持Oracle数据库。无论何时出现数据库连接错误,我想执行重试。我正在使用Spring JDBC连接到Oracle DB。如果需要仅重试JDBC连接问题,那么需要添加哪些异常类列表。 private static Map<Class<? extends Throwable>, Boolean> retryableExcept

    0热度

    1回答

    我需要处理产品ID的顺序,并计划使用卡夫卡为此,现在如果数据丢失从卡夫卡或我的代码我有所有这些产品ID在可能数据库,如果记录没有在给定的时间内处理可以说24小时我需要重新发布他们在一个队列中,但优先考虑,因为kafka没有队列中的数据的优先概念我可以有另一个队列,可以充当优先队列。 我面临的问题是我需要在优先队列中排序产品。因此,如果我已经在基于散列的分区中进行分发,并且我的客户再次处理消息,即为

    0热度

    1回答

    我想在一个Spring引导应用程序中创建多个Kafka使用者组以处理不同的Kafka队列。需求场景基于消息的临界性,它应该推送到不同的Kafka队列。为了管理不同的卡夫卡队列,我想创建一个专门的卡夫卡用户组。但我不确定在一次春季启动应用程序中是否可以创建多个卡夫卡消费群。 目前我有三个卡夫卡主题,每个主题有4个分区,只有一个卡夫卡消费者组和三个卡夫卡消费者。这三名卡夫卡消费者从三个专用卡夫卡队列中

    1热度

    1回答

    我正在建设一个春季卡夫卡消费者。我已经设置了重试机制。重试完毕后,我想将失败的消息推送给死信主题。 Listen方法具有以下参数 public void listen(@Payload Map<String, Object> conciseMap, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, Ackn

    1热度

    1回答

    我正在调查kafka溪流。我想过滤我的流,使用选择性非常低的过滤器(几千个之一)。我正在看这种方法: https://kafka.apache.org/0100/javadoc/org/apache/kafka/streams/kstream/KStream.html#filter(org.apache.kafka.streams.kstream.Predicate) 但我找不到任何证据,如果过滤