2014-02-06 47 views
2

我怎么能脚本,使用两个队列,这些的?:Python的多与更新队列和输出队列

  1. 一个作为与一些数据开始工作队列,取决于一个Python多进程要并行执行的功能条件,可以动态地接收进一步的任务,另一个收集结果并在处理完成后用于记录结果。

我基本上需要在工作队列中放置更多的任务,这取决于我在其初始项目中找到的内容。我在下面发表的例子很愚蠢(我可以根据自己的喜好将其转换为输出队列),但其机制很清晰,并反映了我需要开发的一部分概念。

特此我尝试:

import multiprocessing as mp 

def worker(working_queue, output_queue): 
    item = working_queue.get() #I take an item from the working queue 
    if item % 2 == 0: 
     output_queue.put(item**2) # If I like it, I do something with it and conserve the result. 
    else: 
     working_queue.put(item+1) # If there is something missing, I do something with it and leave the result in the working queue 

if __name__ == '__main__': 
    static_input = range(100)  
    working_q = mp.Queue() 
    output_q = mp.Queue() 
    for i in static_input: 
     working_q.put(i) 
    processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())] #I am running as many processes as CPU my machine has (is this wise?). 
    for proc in processes: 
     proc.start() 
    for proc in processes: 
     proc.join() 
    for result in iter(output_q.get, None): 
     print result #alternatively, I would like to (c)pickle.dump this, but I am not sure if it is possible. 

这并没有结束,也没有打印任何结果。

在整个过程结束时,我想确保工作队列是空的,并且所有并行函数都在迭代后取出结果之前写入输出队列。你有如何使其工作的建议?

回答

2

您在创建进程的行中存在拼写错误。它应该是mp.Process,而不是mp.process。这是什么导致你得到的例外。

此外,您不在工作中循环,因此它们实际上仅从队列中消耗一个物品,然后退出。在不了解更多关于所需逻辑的情况下,提供具体建议并不容易,但您可能需要将worker函数的主体放入while True循环中,并在工作完成时在主体中添加一个条件以退出。

请注意,如果您没有添加条件以显式退出循环,则当队列为空时,您的工作人员将永远停止。您可能会考虑使用所谓的毒丸技术来向他们可能退出的工人发出信号。您将在PyMOTW关于Communication Between processes的文章中找到一个示例和一些有用的讨论。

至于要使用的进程数量,您需要进行基准测试以找出适合您的方法,但是,一般来说,每个内核的一个进程是您的工作负载受CPU限制的良好起点。如果您的工作负载是IO绑定的,那么您可能会获得更好的结果,并且员工数量更多。

+0

谢谢!我将在问题中编辑这个。 – Jaqo

+0

不客气!请不要在你这样做的时候编辑我的回复,开始处理你问题的其余部分。 – tawmas

+0

我刚刚读了你答案的其余部分。我会尝试应用一段时间True循环。我想知道如果队列中没有更多项目可以处理,那么过程是否完成。我想使用类似队列长度的东西,但文档声明这是不可靠的。 – Jaqo

1

以下代码实现了预期结果。它遵循@tawmas提出的建议。

此代码允许在需要一个过程使用多个内核,该工人可以通过将其在处理过程中被更新提要数据的队列:

import multiprocessing as mp 
def worker(working_queue, output_queue): 
    while True: 
     if working_queue.empty() == True: 
      break #this is the so-called 'poison pill'  
     else: 
      picked = working_queue.get() 
      if picked % 2 == 0: 
        output_queue.put(picked) 
      else: 
       working_queue.put(picked+1) 
    return 

if __name__ == '__main__': 
    static_input = xrange(100)  
    working_q = mp.Queue() 
    output_q = mp.Queue() 
    results_bank = [] 
    for i in static_input: 
     working_q.put(i) 
    processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())] 
    for proc in processes: 
     proc.start() 
    for proc in processes: 
     proc.join() 
    results_bank = [] 
    while True: 
     if output_q.empty() == True: 
      break 
     results_bank.append(output_q.get_nowait()) 
    print len(results_bank) # length of this list should be equal to static_input, which is the range used to populate the input queue. In other words, this tells whether all the items placed for processing were actually processed. 
    results_bank.sort() 
    print results_bank 
+0

一旦结果队列为空,您的打印循环将永久等待。您应该使用get_nowait并显式捕获Empty异常以完全退出。 – tawmas

+0

再次感谢您的帮助。我正在做一个 尝试:\ n打印结果\ n,除了空:\ n break \ n' 这是打印预期的总体结果,但控制台输出仍抱怨异常。我认为我没有妥善处理它。 – Jaqo

+0

你需要从你的尝试中的队列中获得。 – tawmas