2009-05-27 55 views
11

我正在编写一个生产者和多个消费者的服务器程序, 什么使我困惑的只是生产者放入队列的第一个任务获取 消耗,之后排队的任务不再消耗,他们永远在队列中保持 。python多重处理的生产者/消费者问题

from multiprocessing import Process, Queue, cpu_count 
from http import httpserv 
import time 

def work(queue): 
    while True: 
     task = queue.get() 
     if task is None: 
      break 
     time.sleep(5) 
     print "task done:", task 
    queue.put(None) 

class Manager: 
    def __init__(self): 
     self.queue = Queue() 
     self.NUMBER_OF_PROCESSES = cpu_count() 

    def start(self): 
     self.workers = [Process(target=work, args=(self.queue,)) 
         for i in xrange(self.NUMBER_OF_PROCESSES)] 
     for w in self.workers: 
      w.start() 

     httpserv(self.queue) 

    def stop(self): 
     self.queue.put(None) 
     for i in range(self.NUMBER_OF_PROCESSES): 
      self.workers[i].join() 
     queue.close() 

Manager().start() 

生产者是把一个任务队列中的一次接收 来自用户的请求的HTTP服务器。看起来消费者进程仍然在队列中有新任务时被阻塞,这很奇怪。

P.S.另外两个问题与上述不相关,我不确定是否 最好把HTTP服务器放在除主进程外的其他进程中,如果是的话,我怎么能让主进程在所有子进程结束之前保持运行。第二个问题,优雅地阻止 HTTP服务器的最佳方式是什么?

编辑:添加生产者代码,它只是一个简单的Python WSGI服务器:

import fapws._evwsgi as evwsgi 
from fapws import base 

def httpserv(queue): 
    evwsgi.start("0.0.0.0", 8080) 
    evwsgi.set_base_module(base) 

    def request_1(environ, start_response): 
     start_response('200 OK', [('Content-Type','text/html')]) 
     queue.put('task_1') 
     return ["request 1!"] 

    def request_2(environ, start_response): 
     start_response('200 OK', [('Content-Type','text/html')]) 
     queue.put('task_2') 
     return ["request 2!!"] 

    evwsgi.wsgi_cb(("/request_1", request_1)) 
    evwsgi.wsgi_cb(("/request_2", request_2)) 

    evwsgi.run() 

回答

7

我想一定是坏了Web服务器的一部分,因为这完美的作品:

from multiprocessing import Process, Queue, cpu_count 
import random 
import time 


def serve(queue): 
    works = ["task_1", "task_2"] 
    while True: 
     time.sleep(0.01) 
     queue.put(random.choice(works)) 


def work(id, queue): 
    while True: 
     task = queue.get() 
     if task is None: 
      break 
     time.sleep(0.05) 
     print "%d task:" % id, task 
    queue.put(None) 


class Manager: 
    def __init__(self): 
     self.queue = Queue() 
     self.NUMBER_OF_PROCESSES = cpu_count() 

    def start(self): 
     print "starting %d workers" % self.NUMBER_OF_PROCESSES 
     self.workers = [Process(target=work, args=(i, self.queue,)) 
         for i in xrange(self.NUMBER_OF_PROCESSES)] 
     for w in self.workers: 
      w.start() 

     serve(self.queue) 

    def stop(self): 
     self.queue.put(None) 
     for i in range(self.NUMBER_OF_PROCESS): 
      self.workers[i].join() 
     queue.close() 


Manager().start() 

输出示例:

starting 2 workers 
0 task: task_1 
1 task: task_2 
0 task: task_2 
1 task: task_1 
0 task: task_1 
+0

真棒而如果你能提供一个制片人+多工人的例子。这将是很好的。 – 2017-06-15 08:17:04

4

“第二个问题,优雅地停止HTTP服务器的最佳方法是什么?”

这很难。

您有进程间通信两种选择:

  • 出带外控制。服务器有另一种通信机制。另一个套接字,Unix信号或其他。其他的东西可能是服务器本地目录中的“立即停止”文件。似乎很奇怪,但它确实很好,比引入一个选择循环来侦听多个套接字或信号处理程序以捕获Unis信号更简单。

    “立即停止”文件很容易实现。 evwsgi.run()循环仅在每次请求后检查此文件。为了使服务器停止,你创建文件,执行一个/control请求(这会得到一个500错误或者什么,这并不重要),服务器应该停下来。请记住删除立即停止文件,否则您的服务器将不会重新启动。

  • 带内控件。服务器有另一个URL(/stop),它会阻止它。从表面上看,这看起来像是一场安全噩梦,但它完全取决于该服务器在何处以及如何使用。由于它似乎是一个简单的内部请求队列封装,所以这个额外的URL运行良好。

    为了完成这项工作,您需要编写自己的evwsgi.run()版本,可以通过设置某个变量来终止循环。

编辑

你可能不希望终止您的服务器,因为你不知道国家的它的工作线程。您需要向服务器发出信号,然后您只需等待正常完成。

如果你想强行杀死服务器,那么os.kill()(或multiprocessing.terminate)将工作。当然,除了你不知道子线程在做什么之外。

+0

如何将服务器放在其自己的进程中,并使用multiprocessing.Process.terminate方法来终止进程?这似乎更容易。 – btw0 2009-05-27 14:49:44