2016-09-15 33 views
7

我正在使用带有通配符的模式订阅Kafka,如下所示。通配符代表一个动态的客户ID。如何使用kafka-python订阅多个kafka通配符模式的列表?

consumer.subscribe(pattern='customer.*.validations') 

这很好,因为我可以从主题字符串中摘取客户ID。但是现在我需要扩展功能来倾听类似的话题,以达到略有不同的目的。我们称之为customer.*.additional-validations。代码需要存在于同一个项目中,因为共享了很多功能,但是我需要能够根据队列类型采取不同的路径。

Kafka documentation我可以看到有可能订阅一组主题。但是,这些是硬编码的字符串。不是允许灵活性的模式。

>>> # Deserialize msgpack-encoded values 
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads) 
>>> consumer.subscribe(['msgpackfoo']) 
>>> for msg in consumer: 
...  assert isinstance(msg.value, dict) 

所以我想知道是否有可能以某种方式做两者的组合?有点像这个(非工作):

consumer.subscribe(pattern=['customer.*.validations', 'customer.*.additional-validations']) 

回答

8

在KafkaConsumer代码,它支持的主题列表,或图案,

https://github.com/dpkp/kafka-python/blob/68c8fa4ad01f8fef38708f257cb1c261cfac01ab/kafka/consumer/group.py#L717

def subscribe(self, topics=(), pattern=None, listener=None): 
     """Subscribe to a list of topics, or a topic regex pattern 
     Partitions will be dynamically assigned via a group coordinator. 
     Topic subscriptions are not incremental: this list will replace the 
     current assignment (if there is one). 

所以,你可以创建一个正则表达式,OR条件使用|,这应该像订阅多个动态主题regex一样工作,因为它在内部使用re模块进行匹配。

(customer.*.validations)|(customer.*.additional-validations)

+0

谢谢!这个语法很好。 –