2016-10-02 26 views
0

我是Twisted的新手,这是我的第一个程序。扭曲的python从kafka读取并写入elasticsearch

我无法从kafka-python库中找到使用KafkaConsumer的方法,并使用treq来触发对elasticsearch的发布请求。

我能在小块分解问题: 创建卡夫卡消费迭代器,并从中读取数据(的话题可能是巨大的)

def consumeKafka(): 
    consumer = KafkaConsumer(bootstrap_servers="kafka:9092", auto_offset_reset='earliest') 
    consumer.subscribe(['kafkapipeline']) 
    for v in consumer: 
     v.value 

后使用TREQ

def post(self): 
    d = treq.post('http://es:9200/pro/pr/', self.data) 
    d.addCallbacks(lambda x: print(x), lambda x: print("error %s " % x)) 

到elasticsearch启动反应堆

from twisted.internet import reactor 
reactor.callWhenRunning(consumeKafka) 
reactor.run() 

任何想法如何做这个工作?

回答

0

我根本不使用卡夫卡,所以我不确定这是否适合您。另外,我假设你在同时运行Kafka和treq时遇到了麻烦。我在Twisted中处理迭代器的一种通用方法是使用inlineCallbacks来等待结果,然后用结果做一些事情。

from twisted.internet import defer 

@defer.inlineCallbacks 
def consumeKafka(): 
    consumer = KafkaConsumer(bootstrap_servers="kafka:9092", auto_offset_reset='earliest') 
    consumer.subscribe(['kafkapipeline']) 
    for v in consumer: 
     value = yield v.value 
     # do stuff with value 

然后,你可以简单地调用这个函数,反应堆将负责其余部分。所以,你的主要部分看起来就像这样:

consumeKafka() 
reactor.run() 

注意,consumeKafka()函数返回一个Deferred,以便根据需要添加回调和errbacks。一旦您对此模型感到满意,请查看Cooperator对象以获取更多功能。

+1

感谢您的回答,我仍然试图让它工作,但我想我需要先阅读扭曲的文档。这并不容易。 – rolele