2016-02-29 29 views
0

我想为4个异步方法使用4个进程。Python3池异步进程|工人

这是我的1种异步方法(x)的代码:根据

from multiprocessing import Pool 
import time 

def x(i): 
    while(i < 100): 
      print(i) 
      i += 1 
      time.sleep(1) 

def finish(str): 
    print("done!") 

if __name__ == "__main__": 
    pool = Pool(processes=5) 
    result = pool.apply_async(x, [0], callback=finish) 

print("start") 

https://docs.python.org/2/library/multiprocessing.html#multiprocessing.JoinableQueue 在池的参数的过程是工人的数量。

我该如何使用这些工作人员?

编辑:我ASYNC类

from multiprocessing import Pool 
import time 

class ASYNC(object): 
    def __init__(self, THREADS=[]): 
     print('do') 
     pool = Pool(processes=len(THREADS)) 
     self.THREAD_POOL = {} 
     thread_index = 0 
     for thread_ in THREADS: 
      self.THREAD_POOL[thread_index] = { 
       'thread': thread_['thread'], 
       'args': thread_['args'], 
       'callback': thread_['callback'] 
      } 
      pool.apply_async(self.run, [thread_index], callback=thread_['callback']) 
      self.THREAD_POOL[thread_index]['running'] = True 
      thread_index += 1 
    def run(self, thread_index): 
     print('enter') 
     while(self.THREAD_POOL[thread_index]['running']): 
      print("loop") 
      self.THREAD_POOL[thread_index]['thread'](self.THREAD_POOL[thread_index]) 
      time.sleep(1) 
     self.THREAD_POOL[thread_index]['running'] = False 
    def wait_for_finish(self): 
     for pool in self.THREAD_POOL: 
      while(self.THREAD_POOL[pool]['running']): 
       time.sleep(1) 
def x(pool): 
    print(str(pool)) 
    pool['args'][0] += 1 


def y(str): 
    print("done") 

A = ASYNC([{'thread': x, 'args':[10], 'callback':y}]) 

print("start") 
A.wait_for_finish() 
+0

apply_async是否适用于下一个可用的进程? –

回答

0

multiprocessing.Pool被设计为工作分配给工人池的一种方便的方式,而不必担心哪一个工作做哪些工作。它的大小之所以能够让你懒散地将工作分派到队列中,以及限制创建子进程的昂贵(相对)的开销。

所以你的问题的答案原则上你不应该能够访问池中的个别工作人员。如果您希望能够单独解决的工人,你将需要实现自己的工作分配系统,并使用multiprocessing.Process,是这样的:

from multiprocessing import Process 

def x(i): 
    while(i < 100): 
     print(i) 
     i += 1 

pools = [Process(target=x, args=(1,)) for _ in range(5)] 
map(lambda pool: pool.start(), pools) 
map(lambda pool: pool.join(), pools) 
print('Done!') 

现在你可以直接访问每一个工人。如果您希望能够在每个工作人员正在运行时动态地发送工作(不仅仅是像我在我的例子中那样做一件事),那么您必须自己实施,有可能使用multiprocessing.Queue。看看multiprocessing的代码,看看如何将工作分配给其工作人员以了解如何执行此操作。

为什么你想这样做呢?如果只是担心工人是否有效地安排工作,那么我的建议就是相信multiprocessing为您解决问题,除非您有充分的证据表明您的情况并非因为某种原因。

+0

我不认为这就是我要找的 –

+0

好的,你能详细解释一下你在找什么吗?只是你想知道每次调用'x()'的执行结束了吗? – daphtdazz