2012-07-25 114 views
2

我想要做这样的事情(1个队列,并在多个用户):多个消费者,是否可以克隆队列(gevent)?

import gevent 
from gevent import queue 

q=queue.Queue() 
q.put(1) 
q.put(2) 
q.put(3) 
q.put(StopIteration) 

def consumer(qq): 
    for i in qq: 
     print i 

jobs=[gevent.spawn(consumer,i) for i in [q,q]] 

gevent.joinall(jobs) 

但是这是不可能的......队列由JOB1消耗...所以作业2将永远阻塞。 它给了我例外gevent.hub.LoopExit: This operation would block forever

我会每个消费者将能够从开始消耗完整的队列。 (应该显示1,2,3,1,2,3或1,1,2,2,3,3 ...从不知道)

一个想法应该是在产卵前克隆队列,但这是不可能的使用复制(浅/深)模块;-(

是否有另一种方式来做到这一点?

[编辑] 你觉得什么?

import gevent 
from gevent import queue 

class MasterQueueClonable(queue.Queue): 
    def __init__(self,*a,**k): 
     queue.Queue.__init__(self,*a,**k) 

     self.__cloned = [] 
     self.__old=[] 

    #override 
    def get(self,*a,**k): 
     e=queue.Queue.get(self,*a,**k) 
     for i in self.__cloned: i.put(e) # serve to current clones 
     self.__old.append(e)    # save old element 
     return e 

    def clone(self): 
     q=queue.Queue() 
     for i in self.__old: q.put(i) # feed a queue with elements which are out 
     self.__cloned.append(q)   # stock the queue, to be able to put newer elements too 
     return q 

q=MasterQueueClonable() 
q.put(1) 
q.put(2) 
q.put(3) 
q.put(StopIteration) 

def consumer(qq): 
    for i in qq: 
     print id(qq),i 

jobs=[gevent.spawn(consumer,i) for i in [q.clone(), q ,q.clone(),q.clone()]] 
gevent.joinall(jobs) 

它是基于这样的理念RyanYe。有一个没有调度员的“主队列”。 我的主队列覆盖GET方法,并可以派发到按需克隆。 此外,可以在开始masterqueue之后创建“克隆”(使用__old技巧)。

回答

2

我建议你创建一个greenlet将工作分发给消费者。示例代码:

import gevent 
from gevent import queue 

master_queue=queue.Queue() 
master_queue.put(1) 
master_queue.put(2) 
master_queue.put(3) 
master_queue.put(StopIteration) 

total_consumers = 10 
consumer_queues = [queue.Queue() for i in xrange(total_consumers)] 

def dispatcher(master_queue, consumer_queues): 
    for i in master_queue: 
     [j.put(i) for j in consumer_queues] 
    [j.put(StopIteration) for j in consumer_queues] 

def consumer(qq): 
    for i in qq: 
     print i 

jobs=[gevent.spawn(dispatcher, q, consumer_queues)] + [gevent.spawn(consumer, i) for i in consumer_queues] 
gevent.joinall(jobs) 

更新:修复了消费者队列丢失的StopIteration。谢谢你指出。

+0

当然......麻烦的是,如果有在master_queue中传输的“50Mb”...... consumer_queues中将会出现“500Mb”(50 * 10)。 如果需要,我想按需创建消费者队列。 – manatlan 2012-07-25 15:30:52

+1

我想提两点1)我们谈论的是10倍的引用而不是10倍的实际数据字节。如果每个项目都是一个复杂的对象,这是一个巨大的差异。 2)我们正在讨论'10倍内存访问'而不是'10倍内存空间'。在大多数情况下,这不会是瓶颈。实际上,由于每个消费者队列需要访问主队列中的所有项目。因此,'10x内存访问'是不可避免的。 – 2012-07-25 15:45:54

+0

你是对的......但在我的情况下,主队列将包含mp3流的chunck。我需要“克隆”,因为很多“mp3 http clients”请求很多次流(获取信息,大小,标签,格式...)... – manatlan 2012-07-26 07:58:49

0

在由Ryan烨一行答案是错过了在调度()函数的末尾: [j.put(StopIteration异常),用于consumer_queues J] 没有它,我们仍然可以得到“gevent.hub.LoopExit:这个操作会永远阻塞',因为'我在master_queue'循环不会将StopIteration异常复制到consumer_queues中。

(对不起,我不能发表评论还没有,所以我把它写成一个separete答案。)

+0

谢谢,修复了我的帖子中的代码。 – 2012-07-26 13:08:25

1

我已经addedcopy()方法队列类:

>>> import gevent.queue 
>>> q = gevent.queue.Queue() 
>>> q.put(5) 
>>> q.copy().get() 
5 
>>> q 
<Queue at 0x1062760d0 queue=deque([5])> 

让我知道,如果它帮助。

+0

谢谢,我相信它错过了这个功能。这是一个很好的补充!但在我的情况下(我现在无法测试;-(),如果在“.copy”之后的'q'中添加另一个元素会发生什么......此元素是否也添加到复制版本? – manatlan 2012-07-26 11:29:22

+0

no,a副本当然是独立的。 – 2012-07-26 13:09:50

相关问题