3

我一直在与卡夫卡合作,对消费者群体下的消费者有点混淆。混淆的核心是是否将消费者实施为流程或线程。对于这个问题,假设我正在使用高级消费者。卡夫卡消费者 - 消费者流程和主题分区与主题分区的关系如何?

让我们考虑一个我已经尝试过的场景。在我的话题中有2个分区(为了简单起见,我们假设复制因子只是1)。我用group1组创建了一个消费者(ConsumerConnector)进程consumer1,然后创建了大小为2的主题计数映射,然后在该进程下生成了2个消费者线程consumer1_thread1consumer1_thread2。它看起来像consumer1_thread1正在消耗分区0consumer1_thread2正在消耗分区1。这种行为总是确定性的吗?以下是代码片段。类TestConsumer是我的消费者线程类。

... 
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
    topicCountMap.put(topic, new Integer(2)); 
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); 

    executor = Executors.newFixedThreadPool(2); 

    int threadNumber = 0; 
    for (final KafkaStream stream : streams) { 
     executor.submit(new TestConsumer(stream, threadNumber)); 
     threadNumber++; 
    } 
    ... 

现在,让我们考虑另一种情况下(我还没有尝试,但很好奇),我开始2消费者处理consumer1consumer2都具有相同群组group1和他们每个人是一个单线程过程。现在我的问题是:

  1. 在这种情况下,两个独立的消费者过程(在同一组下)如何与分区相关?它与上面的单进程多线程场景有什么不同?

  2. 一般来说,消费者线程或进程如何映射/关联到主题中的分区?

  3. 卡夫卡文档确实说消费者组下的每个消费者都会消费一个分区。但是,它是指消费者线程(如我的上面的代码示例)还是独立的消费者进程?

  4. 有没有什么微妙的东西我在这里丢失关于实现消费者作为进程vs线程?提前致谢。

回答

7

消费者组可以运行多个消费者实例(多个进程使用相同的group-id)。在使用时,每个分区仅由组中的一个使用者实例消耗。

E.g.如果您的主题包含2个分区,并且您使用2个消费者实例启动消费者组group-A,则其中每个消费者组都将使用该主题的特定分区中的消息。

如果您启动具有不同组ID的相同2消费者group-A & group-B那么来自两个主题分区的消息将被广播到它们中的每一个。所以在这种情况下,在group-A下运行的消费者实例将具有来自这两个主题分区的消息,并且对于group-B也是如此。

了解更多关于这对他们documentation

编辑:基于您的评论它说,

我想知道是什么具有在相同的2个消费者线程之间的有效差异过程而不是2个消费过程(在两种情况下组相同)

消费者group-id在群集中是相同/全局的。假设你已经启动了具有2个线程的进程,然后产生另一个进程(可能在不同的机器上),而同一个groupId有2个线程,那么kafka将添加这2个新线程以使用来自主题的消息。所以最终会有4个线程负责从同一主题中消费。然后Kafka将触发重新分配以将分区重新分配给线程,所以可能发生的情况是线程T1 of process P1正在使用的特定分区可能被分配为被线程T2 of process P2占用。以下几行摘自维客页面

当一个新进程启动时使用相同的使用者组名称时,Kafka会将进程的线程添加到可用于使用该主题并触发'再平衡”。在此重新平衡过程中,Kafka会将可用分区分配给可用线程,可能会将分区移至另一个进程。如果您混合使用新旧业务逻辑,则可能会有一些消息转到旧逻辑。

+0

我明白你的意思。但是,我最初的问题主要关注的是线程与进程。我想知道在同一个进程下有两个消费者线程,而不是两个消费者进程(在这两种情况下,组是相同的)有什么区别? –

+1

@AsifIqbal更新了我的回答 – user2720864

+0

感谢您的编辑。这真的回答了我的好奇心。与此同时,我对多个流程做了一些进一步的实验,并看到重新平衡正在发生:)。 –

1

选择具有相同标识的多个消费者组实例与单个消费者组实例的主要设计决策是弹性。例如,如果您的单个消费者拥有两个线程,那么如果此机器停机,您将失去所有消费者。如果您有两个具有相同标识的独立消费者组,每个不同主机上的消费者组都可以承受失败。理想情况下,每个用户组在上面应该有两个线程,因此如果一个主机出现故障,另一个用户组使用其休眠线程占用另一个分区。事实上,总是希望拥有比分区多的线程来覆盖这个因素。

  1. 您可以在不同的主机上运行每个使用者组。对于一个给定的名称/ ID,只有一个用户组可以在单个主机上运行,​​因为它在单个运行时环境中管理其所有线程。
  2. Kafka有一个算法来确定哪些线程/使用者组读取各个主题分区。卡夫卡试图以弹性的方式平均分配这些内容。当使用者组发生故障时,它使其他组中的其他线程能够读取给定的分区。
  3. 指用户组中的单个线程。如果线程数多于分区数,那么其中一些线程将保持休眠状态,直到其他线程无法提供弹性。
  4. 首选项与韧性有关。因此,对于多个使用相同ID的消费者组设置,我可以在多台主机上运行,​​使我的应用程序可以容忍失败。
+0

答案中有很多不正确的东西。首先,您可以使用单个使用者组运行多个使用者主机。偏移在组级别上进行跟踪,因此使用组来避免组内的线程跨越彼此的脚趾。即如果你设置了2个主机和2个不同的组来获取相同的主题,那么最终消息数量将增加一倍。 –

+0

第二件事是,卡夫卡没有确定线程何时失败的算法,您需要手动重新生成失败线程,否则即使对于基于组的高级用户,该分区也不会被使用。我用4个分区和8个线程测试了这个确切的事情,并且属于失败线程的分区只是保持未消耗(即它的偏移量不会移动,直到线程重新生成为止)。 –

+0

所以如果你有4个分区并产生8个消费者线程,4个线程将不会做任何事情。另一方面,如果你产生了2个线程,每个线程将从2个分区中消耗。 –

相关问题