2016-04-03 51 views
0

我试着通过设定偏移,但得到断言错误消耗从主题数据 -Asse田:未分配的分区

from kafka import KafkaConsumer 

consumer = KafkaConsumer('foobar1', 
         bootstrap_servers=['localhost:9092']) 
print 'process started' 
print consumer.partitions_for_topic('foobar1') 
print 'done' 
consumer.seek(0,10) 

for message in consumer: 
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, 
              message.offset, message.key, 
              message.value)) 
print 'process ended' 

错误: -

Traceback (most recent call last): 
    File "/Users/pn/Documents/jobs/ccdn/kafka_consumer_1.py", line 21, in <module> 
    consumer.seek(0,10) 
    File "/Users/pn/.virtualenvs/vpsq/lib/python2.7/site-packages/kafka/consumer/group.py", line 549, in seek 
    assert partition in self._subscription.assigned_partitions(), 'Unassigned partition' 
AssertionError: Unassigned partition 

回答

1

你必须调用consumer.assign()在调用seek之前列出TopicPartition。 另请注意,seek的第一个参数也是TopicPartition。 见KafkaConsumer API

0

在我的情况与Kafka 0.9kafka-python,分区分配中for message in consumer发生。所以,迭代之后应该寻求操作。我重置我的组的偏移由以下代码:

import kafka 

ps = [] 
for i in xrange(topic_partition_number): 
    ps.append(kafka.TopicPartition(topic, i)) 

consumer = kafka.KafkaConsumer(topic, bootstrap_servers=address, group_id=group) 
for msg in consumer: 
    print msg 
    consumer.seek_to_beginning(*ps) 
    consumer.commit() 
    break