2013-07-31 137 views
5

我是卡夫卡新手,我对消费者的理解是基本上有两种类型的实现。
1)The High level consumer/consumer group
2)Simple ConsumerApache Kafka消费群体和简单消费者

对高层次的抽象最重要的部分是,当卡夫卡不关心处理它使用的,而简单的消费提供了偏移管理更好地控制所抵消。令我迷惑的是如果我想在多线程环境中运行消费者,并且想要控制偏移量。如果我使用消费者组,这是否意味着我必须从存储在zookeeper中的最后一个偏移量中读取?这是我拥有的唯一选择。

回答

6

绝大多数情况下,高级消费者API不会让您直接控制偏移量。

首次创建使用者组时,可以通过使用auto.offset.reset属性来确定是否以kafka存储的最早或最新消息开始。

您还可以通过将auto.commit.enable设置为false来控制何时高级别消费者向动物园管理员提交新的偏移量。

由于高级消费者将偏移量存储在zookeeper中,您的应用程序可以直接访问zookeeper并操纵偏移量 - 但它将超出高级消费者API。

你的问题有点混乱,但你可以在多线程环境中使用简单的使用者。这就是高级消费者所做的。

0

在Apache Kafka 0.9和0.10中,消费者组管理由Broker(用于协调)和主题(用于状态存储)完全在Kafka应用程序内处理。

当消费者组第一预订主题的auto.offset.reset的设置确定,消费者开始消耗消息(http://kafka.apache.org/documentation.html#newconsumerconfigs

当特定消费者分配主题/分区可以将ConsumerRebalanceListener注册以接收通知。

消费者运行后,可以使用seekseekToBeginningseekToEnd从特定偏移量中获取消息。 seek会影响消费者下一poll,并保存在下一提交(如commitSynccommitAsync或当auto.commit.interval流逝,如果启用。)

消费者的javadoc提及更具体的情况:http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

您可以将卡夫卡提供的组管理与通过查询(..)手动管理偏移进行组合,一旦分区分配完成。