0
我是Twisted的新手,这是我的第一个程序。扭曲的python从kafka读取并写入elasticsearch
我无法从kafka-python库中找到使用KafkaConsumer的方法,并使用treq来触发对elasticsearch的发布请求。
我能在小块分解问题: 创建卡夫卡消费迭代器,并从中读取数据(的话题可能是巨大的)
def consumeKafka():
consumer = KafkaConsumer(bootstrap_servers="kafka:9092", auto_offset_reset='earliest')
consumer.subscribe(['kafkapipeline'])
for v in consumer:
v.value
后使用TREQ
def post(self):
d = treq.post('http://es:9200/pro/pr/', self.data)
d.addCallbacks(lambda x: print(x), lambda x: print("error %s " % x))
到elasticsearch启动反应堆
from twisted.internet import reactor
reactor.callWhenRunning(consumeKafka)
reactor.run()
任何想法如何做这个工作?
感谢您的回答,我仍然试图让它工作,但我想我需要先阅读扭曲的文档。这并不容易。 – rolele