kafka-consumer-api

    0热度

    1回答

    我正在研究一个需要从Kafka系统读取然后触发事件的企业Web应用程序。任何人都可以提出一种方法来获得偏移量,也是存储偏移量的理想方法(理想方式应该能够处理应用程序的多个实例的访问)? 注意: - 我使用spring-kafka并打开任何进一步的建议。 在此先感谢。

    0热度

    2回答

    我们正在使用Spring Kafka实现卡夫卡消费者。正如我理解正确的话单信息的处理失败,有选项 不关心,只是ACK 不要使用一些重试处理一个RetryTemplate 如果连这不起作用做一些自定义故障处理使用RecoveryCallback 我想知道你的最佳做法是什么。我想到了简单的应用程序异常,例如DeserializationException(用于JSON格式的消息)或更长的本地存储停机时

    0热度

    2回答

    我有一个分区主题,它有X分区。 截至目前,在生成消息时,我创建了仅指定topic和value的卡夫卡ProducerRecord。我没有定义key。 据我所知,我的消息将使用默认的内置分区器在分区间均匀分配。 另一方面,我有一个卡夫卡消费者的线程池。每个卡夫卡消费者都将在自己的专用线程中运行消耗该主题的消息。这些消费者中的每一个都被赋予相同的group.id。这将允许消费并行消息。每个消费者将被分

    3热度

    1回答

    我正在浏览卡夫卡的消费者配置。 https://kafka.apache.org/documentation/#newconsumerconfigs 是什么将触发一个再平衡的参数?例如下面的参数将会是?任何其他我们需要更改或默认的参数都可以满足要求 connections.max.idle.ms在此配置指定的毫秒数后关闭空闲连接。长540000中 另外,我们有三个不同的主题 它是一个坏主意,有相同

    1热度

    1回答

    我们有v0.8.2的消费者,我们正在更新到v0.10.2。这些是大量应用程序,所以我们通常会进行滚动更新。问题在于,v8消费者在向v10消费者提交kafka时向动物园管理员提交补偿。在我们的测试中,尝试运行混合使用v8和v10的消费者会导致消息被双重使用。在这里看到更详细的问题的写法:http://www.search-hadoop.com/m/Kafka/uyzND1ymwxk17UWdj?su

    2热度

    3回答

    我创建其中前端服务推消息到卡夫卡请求“主题并监听另一“响应”主题对于一些下游后端消费者(实际上是一个复杂的系统中的系统,该系统最终推回卡夫卡),对“请求”消息进行处理,最终推到“响应”主题。 我想弄清楚最优雅的方式,以确保消费者侦听适当的分区并接收响应,并且后端推到前端使用者正在侦听的分区。我们总是需要确保响应发送给产生初始消息的相同消费者。 我有两个解决方案,截至目前,但也不是特别令人满意。任何

    0热度

    1回答

    可以来自不同组的两位消费者从同一主题和分区x中读取并且写入另一个主题和分区y? 我会实施消费者策略,其中一位消费者放弃其他消费者处理的数据。 生产者将数据保存到共享分区的顺序并不重要。 我只是想知道这是否可能

    0热度

    1回答

    我一直在kafka工作了六个月,我有一些关于消费者滞后和数据存储到主题中的分区的问题。 问题编号1: 最初,当我开始阅读关于kafka并了解如何使用功能的知识时,我在循环中教导了一个部分和一个复制因素的主题会产生奇迹。经过了六个月的工作后,我的项目转变为生活消费者,消费者从主题中消耗我的信息开始给我一个滞后。我阅读了许多关于消费者滞后的堆栈溢出答案,并得出结论,如果我增加了我的分区,并且复制因子对

    2热度

    1回答

    我正在研究kafka用例,我需要在生产者&消费者一方拥有事务性语义。我能够使用kafka事务API 0.11将事务性消息发布到kafka集群,但消费者我现在面临这个问题身边...我已isolation.level=read_committed属性文件中,但我不能消耗it..I可以看到邮件被消耗与isolation.level=read_uncommitted但这不希望.. 生产者代码 packag

    1热度

    1回答

    我必须创建一个不断监听和数据推到Database的Kafka topics的消费者一个周期。 这里的要求是: - 如果你碰巧在一个循环中从Kafka读取多个记录,试着把它作为一个单独的调用进入db而不是多个。 public static void kafkaConsumer(String topicName, String groupId, String autoOffsetReset,