2017-04-05 230 views
1

该实现是在Python中实现的。使用confluent_kafka。卡夫卡消费者中的控制消息偏移量

我有一个消费者对象来轮询来自kafka主题的消息。这些消息用于其他大型对象的进一步处理,并且由于大小而无法在每次消息处理后备份对象。

我周期性地转储对象,然后手动提交使用者。以下是我实施的示例代码。

from confluent_kafka import Consumer, KafkaError, TopicPartition 

c = Consumer({ 
    'bootstrap.servers': 'myserver', 
    'group.id': 'mygroup', 
    'default.topic.config': {'auto.offset.reset': 'smallest'}, 
    'enable.auto.commit': "false" 
}) 
c.subscribe(['mytopic']) 

offsets = {} 

for i in range(10): 
    msg = c.poll() 

    if msg.error(): 
     continue 

    par = msg.partition() 
    off = msg.offset() 
    offsets[p] = off 

c.commit(async=False) 

print(offsets) 

当我运行该代码的第二时间,我期望消息偏移量,如果来自相同分区,应该是下一个,即1,与上印偏移。

但是补偿提前了很多。数百个。

我也试图手动分配位置如下:

lst_part = [] 

for par, off in offsets.items(): 
    lst_part.append(TopicPartition('mytopic', par, off)) 

c.assign(lst_part) 

# then start polling messages 

新轮询消息不是已分配的偏移+ 1

回答

1

c.commit(async=False)将承诺针对其信息已被全部耗尽分区通过poll()呼叫从客户端返回到应用程序。

如果你想更精细的控制可冲抵提交你可以传递一个明确的[TopicPartition(..)]列表commit()(请务必提交last_message_offset + 1)或禁止auto.offset.store和显式调用store_offsets()您要的信息/偏移为未来commit()打电话。

请注意,store_offsets()仅适用于master,并且在confluent-kafka-python客户端的发布版本中尚不可用,但很快会出现。

相关问题