我想要做这样的事情(1个队列,并在多个用户):多个消费者,是否可以克隆队列(gevent)?
import gevent
from gevent import queue
q=queue.Queue()
q.put(1)
q.put(2)
q.put(3)
q.put(StopIteration)
def consumer(qq):
for i in qq:
print i
jobs=[gevent.spawn(consumer,i) for i in [q,q]]
gevent.joinall(jobs)
但是这是不可能的......队列由JOB1消耗...所以作业2将永远阻塞。 它给了我例外gevent.hub.LoopExit: This operation would block forever
。
我会每个消费者将能够从开始消耗完整的队列。 (应该显示1,2,3,1,2,3或1,1,2,2,3,3 ...从不知道)
一个想法应该是在产卵前克隆队列,但这是不可能的使用复制(浅/深)模块;-(
是否有另一种方式来做到这一点?
[编辑] 你觉得什么?
import gevent
from gevent import queue
class MasterQueueClonable(queue.Queue):
def __init__(self,*a,**k):
queue.Queue.__init__(self,*a,**k)
self.__cloned = []
self.__old=[]
#override
def get(self,*a,**k):
e=queue.Queue.get(self,*a,**k)
for i in self.__cloned: i.put(e) # serve to current clones
self.__old.append(e) # save old element
return e
def clone(self):
q=queue.Queue()
for i in self.__old: q.put(i) # feed a queue with elements which are out
self.__cloned.append(q) # stock the queue, to be able to put newer elements too
return q
q=MasterQueueClonable()
q.put(1)
q.put(2)
q.put(3)
q.put(StopIteration)
def consumer(qq):
for i in qq:
print id(qq),i
jobs=[gevent.spawn(consumer,i) for i in [q.clone(), q ,q.clone(),q.clone()]]
gevent.joinall(jobs)
它是基于这样的理念RyanYe。有一个没有调度员的“主队列”。 我的主队列覆盖GET方法,并可以派发到按需克隆。 此外,可以在开始masterqueue之后创建“克隆”(使用__old技巧)。
当然......麻烦的是,如果有在master_queue中传输的“50Mb”...... consumer_queues中将会出现“500Mb”(50 * 10)。 如果需要,我想按需创建消费者队列。 – manatlan 2012-07-25 15:30:52
我想提两点1)我们谈论的是10倍的引用而不是10倍的实际数据字节。如果每个项目都是一个复杂的对象,这是一个巨大的差异。 2)我们正在讨论'10倍内存访问'而不是'10倍内存空间'。在大多数情况下,这不会是瓶颈。实际上,由于每个消费者队列需要访问主队列中的所有项目。因此,'10x内存访问'是不可避免的。 – 2012-07-25 15:45:54
你是对的......但在我的情况下,主队列将包含mp3流的chunck。我需要“克隆”,因为很多“mp3 http clients”请求很多次流(获取信息,大小,标签,格式...)... – manatlan 2012-07-26 07:58:49