2017-09-01 23 views
0

我正在理解Python中的多处理。目前,我正试图提高对队列和流程的理解。如何确保在继续之前将所有进程写入输出队列?

我想要做的是迭代一些数据,发送它的块来由先前产生的工作函数进行分析。 正如以下MWE所示,有时会在工人有时间对其数据作出反应之前计算结果。在我继续之前,确保我的工作人员完成的好方法是什么?我知道Pool.join()方法 - 这里有类似的东西吗?我知道Pool.map可以在块中做到这一点,但是如果我给它一个大文件的迭代器(这是本文的最终目标),它仍然会尝试首先读取整个文件,而不是立即开始在大块工作。

#!/usr/bin/env python3 
# -*- coding: utf-8 -*- 

import multiprocessing as mp 
import time 
import queue 

def worker(inqueue, outqueue, name = None): 
    if name is None: 
     name = mp.current_process().pid 
    print("Spawned", name) 

    while True: 
     # Read data from input queue 
     data = inqueue.get() 

     # Kill worker if input is None 
     if data is None: 
      print("Killing", name) 
      return None 

     # Compute partial sum and put on output queue 
     print(name, "got data:", data) 
     partial_sum = sum(data) 
     outqueue.put(partial_sum) 

if __name__ == '__main__': 
    numbers = range(1, 101) 
    buffer_size = 7 # Number of items for each partial sum 

    inqueue = mp.Queue() 
    outqueue = mp.Queue() 

    # Define and start processes 
    processes = [] 
    for i in range(1,5): 
     p = mp.Process(target = worker, 
         args = (inqueue, outqueue, "process %d" % i,)) 
     p.start() 
     processes.append(p) 

    # Run through numbers, periodically sending buffer contents to a worker 
    buffer = [] 
    for num in numbers: 
     buffer.append(num) 
     if len(buffer) >= buffer_size: 
      inqueue.put(buffer) 
      buffer = [] 
     # 

    # Send remaining contents of buffer to worker 
    inqueue.put(buffer) 

    # Kill all processes 
    for _ in range(len(processes)): 
     inqueue.put(None) 

    # Compute running sum as long as output queue contains stuff 
    remaining = True 
    running = 0 

    #time.sleep(1) # Output is as expected if we sleep for 1 sec 

    while remaining: 
     try: 
      temp = outqueue.get(False) 
      running += temp 
     except queue.Empty: 
      remaining = False 
     # 

    print(running) # 0 if no sleep. 5050 if sleep. 

回答

1

注意:您正在使用的这句话:# Kill all processes

你在做什么是一个Process
停止 a Process优雅在一个受控点!
杀人意味着在一个不可预知的执行点(即而不是推荐)中断。


问题:......以确保我的工作人员完成

使用Process.is_alive()方法来做到这一点。

while any([p.is_alive() for p in processes]): 
    time.sleep(0.2) 

multiprocessing.Process

is_alive() 

过程返回是否还活着。 粗略地说,从start()方法返回的那一刻起,一个进程对象处于活动状态,直到子进程终止。

0

我不认为你应该放弃试图用multiprocessing.Pool因为它可能异步并逐渐发工作中的流程,并允许简化的处理。

下面是根据您的问题中的代码使用一个示例。请注意,它使用multiprocessing.Event来告诉Pool进程退出 - 您可能也可以使用现有方法解决问题的技术。从一个典型的试运行

import multiprocessing as mp 
import time 
import queue 

def worker(args): 
    event, inqueue, outqueue, name = args 
    print("{} started".format(name)) 

    while not event.is_set(): # not stopped 
     data = inqueue.get() # Read data from input queue 
     print(name, "got data:", data) 
     # Compute and put partial sum on output queue 
     partial_sum = sum(data) 
     outqueue.put(partial_sum) 

if __name__ == '__main__': 

    CHUNK_SIZE = 7 # Number of items for each partial sum 
    NUM_PROCESSES = 4 
    numbers = range(1, 101) # Data to process. 

    mgr = mp.Manager() 
    inqueue = mgr.Queue() 
    outqueue = mgr.Queue() 
    event = mgr.Event() 

    # Create and start the processes in a processing pool 
    pool = mp.Pool(processes=NUM_PROCESSES) 
    args = [(event, inqueue, outqueue, "Process %d" % (i+1,)) 
       for i in range(NUM_PROCESSES)] 
    pool.map_async(worker, args) 
    pool.close() 

    # Put numbers to process into the work queue in chunks 
    for i in range(0, len(numbers), CHUNK_SIZE): 
     chunk = list(numbers[i: i+CHUNK_SIZE]) 
     print('putting data:', chunk) 
     inqueue.put(chunk) 

    while not inqueue.empty(): # All data processed? 
     time.sleep(.001) 
    event.set() # signal all data processed 
    pool.terminate() 

    # Total all the values in output queue 
    final_sum = 0 
    while True: 
     try: 
      temp = outqueue.get_nowait() 
      final_sum += temp 
     except queue.Empty: 
      break 

    print('final sum:', final_sum) # 5050 if correct 

输出:

putting data: [1, 2, 3, 4, 5, 6, 7] 
putting data: [8, 9, 10, 11, 12, 13, 14] 
putting data: [15, 16, 17, 18, 19, 20, 21] 
putting data: [22, 23, 24, 25, 26, 27, 28] 
putting data: [29, 30, 31, 32, 33, 34, 35] 
putting data: [36, 37, 38, 39, 40, 41, 42] 
putting data: [43, 44, 45, 46, 47, 48, 49] 
putting data: [50, 51, 52, 53, 54, 55, 56] 
putting data: [57, 58, 59, 60, 61, 62, 63] 
putting data: [64, 65, 66, 67, 68, 69, 70] 
putting data: [71, 72, 73, 74, 75, 76, 77] 
putting data: [78, 79, 80, 81, 82, 83, 84] 
putting data: [85, 86, 87, 88, 89, 90, 91] 
putting data: [92, 93, 94, 95, 96, 97, 98] 
putting data: [99, 100] 
Process 1 started 
Process 2 started 
Process 1 got data: [1, 2, 3, 4, 5, 6, 7] 
Process 1 got data: [8, 9, 10, 11, 12, 13, 14] 
Process 1 got data: [15, 16, 17, 18, 19, 20, 21] 
Process 2 got data: [22, 23, 24, 25, 26, 27, 28] 
Process 2 got data: [29, 30, 31, 32, 33, 34, 35] 
Process 1 got data: [36, 37, 38, 39, 40, 41, 42] 
Process 3 started 
Process 2 got data: [43, 44, 45, 46, 47, 48, 49] 
Process 1 got data: [50, 51, 52, 53, 54, 55, 56] 
Process 2 got data: [57, 58, 59, 60, 61, 62, 63] 
Process 3 got data: [64, 65, 66, 67, 68, 69, 70] 
Process 1 got data: [71, 72, 73, 74, 75, 76, 77] 
Process 2 got data: [78, 79, 80, 81, 82, 83, 84] 
Process 4 started 
Process 1 got data: [85, 86, 87, 88, 89, 90, 91] 
Process 2 got data: [92, 93, 94, 95, 96, 97, 98] 
Process 3 got data: [99, 100] 
final sum: 5050 
1

你可以经常简化这种类型的代码 - 并避免在队列和管理的需要外排队自己 - 采取Pool.mapPool.imap的优势。

一个简单的例子:

from multiprocessing import Pool 

def work_items(n, step): 
    for i in range(0, n, step): 
     yield range(i, min(n, i + step)) 

def worker(item): 
    return sum(item) 

with Pool(4) as pool: 
    total = sum(pool.imap(worker, work_items(101, 7))) 
    print(total) # 5050 
相关问题