2015-04-16 40 views
0

我想使用python gevent库来实现一个生产者和多个消费者服务器。有我的尝试:gevent队列失败,LoopExit

class EmailValidationServer(): 
    def __init__(self): 
     self.queue = Queue() 
    def worker(self): 
     while True: 
      json = self.queue.get() 
    def handler(self,socket,address): 
     fileobj = socket.makefile() 
     content = fileobj.read(max_read) 
     contents = json.loads(content) 
     for content in contents: 
      self.queue.put(content) 
    def daemon(self,addr='127.0.0.1',num_thread=5): 
     pool = Pool(1000) 
     server = StreamServer((addr, 6000),self.handler,spawn=pool) # run 
     pool = ThreadPool(num_thread) 
     for _ in range(num_thread): 
      pool.spawn(self.worker) 
     server.serve_forever() 
if __name__ == "__main__": 
    email_server = EmailValidationServer() 
    email_server.daemon() 

我用gevent.queue.Queue的队列。它给我的错误信息:

LoopExit: This operation would block forever 
(<ThreadPool at 0x7f08c80eef50 0/4/5>, 
<bound method EmailValidationServer.worker of <__main__.EmailValidationServer instance at 0x7f08c8dcd998>>) failed with LoopExit 

问题:但是,当我从GEVENT的实施蟒蛇内建库改变队列,它的工作原理。我不知道原因,我想它的实施有所不同。我不知道为什么gevent不允许无限等待。有没有人可以给出解释?感谢提前

回答

2

我建议你可以使用gevent.queue.JoinableQueue()而不是Python的内置Queue()。您可以参阅API用法正式队列指南(http://www.gevent.org/gevent.queue.html

def worker(): 
    while True: 
     item = q.get() 
     try: 
      do_work(item) 
     finally: 
      q.task_done() 

q = JoinableQueue() 
for i in range(num_worker_threads): 
    gevent.spawn(worker) 

for item in source(): 
    q.put(item) 

q.join() # block until all tasks are done 

如果再次遇到了例外,你最好充分了解GEVENT的原则corouinte控制流程......一旦你明白了吧,那不是什么大问题。 :)

+1

@SuperBiasedMan thans格式化我的代码块:) –