我正在理解Python中的多处理。目前,我正试图提高对队列和流程的理解。如何确保在继续之前将所有进程写入输出队列?
我想要做的是迭代一些数据,发送它的块来由先前产生的工作函数进行分析。 正如以下MWE所示,有时会在工人有时间对其数据作出反应之前计算结果。在我继续之前,确保我的工作人员完成的好方法是什么?我知道Pool.join()方法 - 这里有类似的东西吗?我知道Pool.map可以在块中做到这一点,但是如果我给它一个大文件的迭代器(这是本文的最终目标),它仍然会尝试首先读取整个文件,而不是立即开始在大块工作。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import multiprocessing as mp
import time
import queue
def worker(inqueue, outqueue, name = None):
if name is None:
name = mp.current_process().pid
print("Spawned", name)
while True:
# Read data from input queue
data = inqueue.get()
# Kill worker if input is None
if data is None:
print("Killing", name)
return None
# Compute partial sum and put on output queue
print(name, "got data:", data)
partial_sum = sum(data)
outqueue.put(partial_sum)
if __name__ == '__main__':
numbers = range(1, 101)
buffer_size = 7 # Number of items for each partial sum
inqueue = mp.Queue()
outqueue = mp.Queue()
# Define and start processes
processes = []
for i in range(1,5):
p = mp.Process(target = worker,
args = (inqueue, outqueue, "process %d" % i,))
p.start()
processes.append(p)
# Run through numbers, periodically sending buffer contents to a worker
buffer = []
for num in numbers:
buffer.append(num)
if len(buffer) >= buffer_size:
inqueue.put(buffer)
buffer = []
#
# Send remaining contents of buffer to worker
inqueue.put(buffer)
# Kill all processes
for _ in range(len(processes)):
inqueue.put(None)
# Compute running sum as long as output queue contains stuff
remaining = True
running = 0
#time.sleep(1) # Output is as expected if we sleep for 1 sec
while remaining:
try:
temp = outqueue.get(False)
running += temp
except queue.Empty:
remaining = False
#
print(running) # 0 if no sleep. 5050 if sleep.