我正在学习AMQP,所以这可能是我在做什么的误解。我正尝试在我自己的私人服务器上设置rabbitmq,以便为系统服务器提供服务(我有一堆需要处理的图像)。我设置了三步管道:AMQP/RabbitMQ多个消费者,但只有一个geting工作?
| put image on queue | --work_queue--> | process | --results--> | archive results |
我安装了rabbitmq,并在服务器上创建了两个队列。 'work_queue'和'结果'。我使用amqplib编写了一些python脚本,并且使用一个图像处理器工作程序,我的管道工作得很好。我已将100张图片添加到队列中,并且我的一台机器正在愉快地抓取一张图片,并将数据生成并将结果放入结果队列中。
问题是,我假设如果我开始另一台机器上的另一个图像处理工作,它会简单地将工作从队列中拉出。这似乎是rabbitmq站点上“工作队列”教程中列出的确切情况。我预计这只是工作。然而,真正发生的事情是,无论我开始有多少其他工作人员,他们都会永远等待,并且永远不会有任何工作,即使work_queue中有很多消息在等待。
我误解了什么?相关代码队列服务器上的工作:
from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="foo:5672 ", userid="pipeline",
password="XXXXX", virtual_host="/", insist=False)
chan = conn.channel()
....
msg = { 'filename': os.path.basename(i) }
chan.basic_publish(amqp.Message(json.dumps(msg)), exchange='',
routing_key='work_queue')
而且在过程中工人消费端:
from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="foo:5672 ", userid="pipeline",
password="XXXXX", virtual_host="/", insist=False)
chan = conn.channel()
def work_callback(msg):
....
while True:
chan.basic_consume(callback=work_callback, queue='work_queue')
try:
chan.wait()
except KeyboardInterrupt:
print "\nExiting cleanly"
sys.exit(0)
我可以看到其他工人正在连接:
$ sudo rabbitmqctl list_queues
Listing queues ...
results 0
work_queue 246
...done.
$ sudo rabbitmqctl list_connections
Listing connections ...
pipeline 192.168.8.1 41553 running
pipeline XX.YY.ZZ.WW 46676 running
pipeline 192.168.8.4 44482 running
pipeline 192.168.8.6 41884 running
...done.
哪里XX.YY.ZZ.WW是一个外部IP。 192.168.8.6的工作人员在队列中搅动,但外部IP上的工作人员在那里闲置,而有246个消息在队列中等待其应该等待的时间。
想法?
迷人的。如果两个“工人”在我提交工作,然后两个工作。或者甚至更加狡猾,如果只有一个人在提交工作时被连接,就好像所有的工作都安排在那个工人那里......如果另一个工人连接,它只能工作IFF我杀死原来的工人......然后新的一个人获得大约一半的工作?如果我重新连接原来的一个,它会得到剩下的一半?离奇! – clemej
我希望将工作放在一个队列中,并且每个工作人员连接到队列并一次抓取第一个条目。它看起来相反,它会得到所有的工作。也许我不明白队列和渠道? – clemej