2012-04-27 63 views
0

我注意到这种行为在python池分配。即使我在池中有20个进程,但是当我为8个进程执行map_async时,我不会执行所有进程,而只会执行4个进程。当那4个完成时,它再发送两个,然后当这两个完成时发送一个。多处理 - 池分配

当我抛出20多个数据时,它会全部运行20次,直到队列中开始少于20个时,重复上述行为。

我认为这是有意完成的,但看起来很奇怪。我的目标是尽快处理请求,显然这种行为不适合。

使用Python 2.6 billiard为maxtasksperchild支持

任何想法我怎么能提高呢?

代码:

mypool = pool.Pool(processes=settings['num-processes'], initializer=StartChild, maxtasksperchild=10) 

while True: 
    lines = DbData.GetAll() 
    if len(lines) > 0: 
     print 'Starting to process: ', len(lines), ' urls' 
     Res = mypool.map_async(RunChild, lines) 
     Returns = Res.get(None) 
     print 'Pool returns: ', idx, Returns 
    else: 
     time.sleep(0.5) 

回答

2

一种方式我处理在Python多是:

我有,我想使用的功能function()数据。
首先我创建了一个多子类:

import multiprocessing 

class ProcessThread(multiprocessing.Process): 
    def __init__(self, id_t, inputqueue, idqueue, function, resultqueue): 
     self.id_t = id_t 
     self.inputlist = inputqueue 
     self.idqueue = idqueue 
     self.function = function 
     self.resultqueue = resultqueue 

     multiprocessing.Process.__init__(self) 

    def run(self): 
     s = "process number: " + str(self.id_t) + " starting" 
     print s 
     result = [] 

     while self.inputqueue.qsize() > 0 
      try: 
       inp = self.inputqueue.get() 
      except Exception: 
       pass 
      result = self.function(inp) 
      while 1: 
       try: 
        self.resultqueue.put([self.id,]) 
       except Exception: 
        pass 
       else: 
        break 
      self.idqueue.put(id) 
      return 

和主要功能:

inputqueue = multiprocessing.Queue() 
resultqueue = multiprocessing.Queue() 
idqueue = multiprocessing.Queue() 

def function(data): 
    print data # or what you want 

for datum in data: 
    inputqueue.put(datum) 

for i in xrange(nbprocess): 
    ProcessThread(i, inputqueue, idqueue, function, resultqueue).start() 

最后得到的结果:

results = [] 
while idqueue.qsize() < nbprocess: 
    pass 
while resultqueue.qsize() > 0: 
    results.append(resultqueue.get()) 

通过这种方式,你可以完全控制哪些附加与过程和其他的东西。 使用多处理inputqueue是一种有效的技术,只有当每个数据的计算相当慢(< 1,2秒)时,由于不同进程并发访问队列(为什么我使用异常)。如果你的函数计算速度非常快,考虑在开始时只分解一次数据,并把每个进程的数据集块放在一起。

+0

谢谢,这有助于改进我的脚本。我摆脱了默认池处理,并根据您的示例实施了我自己的处理。 – SorinV 2012-04-30 03:29:16