2013-08-30 139 views
1

我正在学习AMQP,所以这可能是我在做什么的误解。我正尝试在我自己的私人服务器上设置rabbitmq,以便为系统服务器提供服务(我有一堆需要处理的图像)。我设置了三步管道:AMQP/RabbitMQ多个消费者,但只有一个geting工作?

| put image on queue | --work_queue--> | process | --results--> | archive results | 

我安装了rabbitmq,并在服务器上创建了两个队列。 'work_queue'和'结果'。我使用amqplib编写了一些python脚本,并且使用一个图像处理器工作程序,我的管道工作得很好。我已将100张图片添加到队列中,并且我的一台机器正在愉快地抓取一张图片,并将数据生成并将结果放入结果队列中。

问题是,我假设如果我开始另一台机器上的另一个图像处理工作,它会简单地将工作从队列中拉出。这似乎是rabbitmq站点上“工作队列”教程中列出的确切情况。我预计这只是工作。然而,真正发生的事情是,无论我开始有多少其他工作人员,他们都会永远等待,并且永远不会有任何工作,即使work_queue中有很多消息在等待。

我误解了什么?相关代码队列服务器上的工作:

from amqplib import client_0_8 as amqp 
conn = amqp.Connection(host="foo:5672 ", userid="pipeline", 
    password="XXXXX", virtual_host="/", insist=False) 
chan = conn.channel() 

.... 

msg = { 'filename': os.path.basename(i) } 

chan.basic_publish(amqp.Message(json.dumps(msg)), exchange='', 
     routing_key='work_queue') 

而且在过程中工人消费端:

from amqplib import client_0_8 as amqp 
conn = amqp.Connection(host="foo:5672 ", userid="pipeline", 
    password="XXXXX", virtual_host="/", insist=False) 
chan = conn.channel() 

def work_callback(msg): 
.... 

while True: 
    chan.basic_consume(callback=work_callback, queue='work_queue') 
    try: 
     chan.wait() 
    except KeyboardInterrupt: 
     print "\nExiting cleanly" 
     sys.exit(0) 

我可以看到其他工人正在连接:

$ sudo rabbitmqctl list_queues 
Listing queues ... 
results 0 
work_queue  246 
...done. 

$ sudo rabbitmqctl list_connections 
Listing connections ... 
pipeline  192.168.8.1  41553 running 
pipeline  XX.YY.ZZ.WW  46676 running 
pipeline  192.168.8.4  44482 running 
pipeline  192.168.8.6  41884 running 
...done. 

哪里XX.YY.ZZ.WW是一个外部IP。 192.168.8.6的工作人员在队列中搅动,但外部IP上的工作人员在那里闲置,而有246个消息在队列中等待其应该等待的时间。

想法?

+0

迷人的。如果两个“工人”在我提交工作,然后两个工作。或者甚至更加狡猾,如果只有一个人在提交工作时被连接,就好像所有的工作都安排在那个工人那里......如果另一个工人连接,它只能工作IFF我杀死原来的工人......然后新的一个人获得大约一半的工作?如果我重新连接原来的一个,它会得到剩下的一半?离奇! – clemej

+0

我希望将工作放在一个队列中,并且每个工作人员连接到队列并一次抓取第一个条目。它看起来相反,它会得到所有的工作。也许我不明白队列和渠道? – clemej

回答

0

对不起回答我自己的问题,但我终于得到了我一直在寻找的行为。我需要将QoS prefetch_count设置为1.显然,通过不设置prefetch_count,只要一个客户端连接,所有200+个消息都被传送到其“通道”并从队列中排出,因此除非另有其他人看到任何工作原来通过增加工作断开(从而关闭通道,并把消息发回队列

:。

chan.basic_qos(0,1,False) 

我的工人,他们现在一次只抢到一个消息不正是我预计会发生,但其现在的工作正如预期的那样。

相关问题