2012-01-27 39 views
26

我想在使用过程对象的Python中使用工作者池。每个工作人员(一个进程)都会进行一些初始化(需要花费很少的时间),通过一系列工作(理想情况下使用map())并返回一些内容。除此之外,没有必要进行沟通。但是,我似乎无法弄清楚如何使用map()来使用我的工作人员的compute()函数。python池与工人进程

from multiprocessing import Pool, Process 

class Worker(Process): 
    def __init__(self): 
     print 'Worker started' 
     # do some initialization here 
     super(Worker, self).__init__() 

    def compute(self, data): 
     print 'Computing things!' 
     return data * data 

if __name__ == '__main__': 
    # This works fine 
    worker = Worker() 
    print worker.compute(3) 

    # workers get initialized fine 
    pool = Pool(processes = 4, 
       initializer = Worker) 
    data = range(10) 
    # How to use my worker pool? 
    result = pool.map(compute, data) 

是作业队列去替代方式,或者我可以使用map()

+0

所有过程对象是有状态。您可能希望从标题中删除该单词。也。 'compute'是Worker的一种方法。在这些例子中,它通常是一个完全独立的功能。为什么不写计算函数来简单地包括初始化和处理? – 2012-01-27 19:48:48

+0

够公平的,谢谢。初始化需要很长时间,所以我只想在每个工作进程中执行一次。 – Felix 2012-01-27 20:14:18

+0

你必须要强调“通过一系列工作”的部分问题。由于这并不明显。 – 2012-01-27 20:19:45

回答

50

我建议你为此使用一个队列。

class Worker(Process): 
    def __init__(self, queue): 
     super(Worker, self).__init__() 
     self.queue= queue 

    def run(self): 
     print 'Worker started' 
     # do some initialization here 

     print 'Computing things!' 
     for data in iter(self.queue.get, None): 
      # Use data 

现在,你可以从一个单一的队列

request_queue = Queue() 
for i in range(4): 
    Worker(request_queue).start() 
for data in the_real_source: 
    request_queue.put(data) 
# Sentinel objects to allow clean shutdown: 1 per worker. 
for i in range(4): 
    request_queue.put(None) 

这种事情应该让你分摊在多个工人昂贵的启动成本开始了一堆这些,都让工作。

+0

这就是我想的,谢谢!我最终使用作业队列(输入)和结果队列(输出)来同步所有内容。 – Felix 2012-01-30 18:44:55

+0

你的例子真棒,我现在尝试如何输入哨兵对象时,没有exepction按下strg + c – Dukeatcoding 2013-06-26 09:55:48

+0

@ S.Lott:是不是队列不可pickle-able?这就是为什么你使用[multiprocessing.Manager().Queue](http://stackoverflow.com/questions/3217002/how-do-you-pass-a-queue-reference-to-a-function-managed-by -pool-MAP-异步)? – zuuz 2013-12-16 12:51:52

4

initializer需要一个可执行的可调用来执行初始化,例如,它可以设置一些全局变量,而不是一个Process子类; map接受一个任意的可迭代:

#!/usr/bin/env python 
import multiprocessing as mp 

def init(val): 
    print('do some initialization here') 

def compute(data): 
    print('Computing things!') 
    return data * data 

def produce_data(): 
    yield -100 
    for i in range(10): 
     yield i 
    yield 100 

if __name__=="__main__": 
    p = mp.Pool(initializer=init, initargs=('arg',)) 
    print(p.map(compute, produce_data()))