在我的应用程序中,我使用多处理模块中的管道在python进程之间进行通信。 最近我观察到一个奇怪的行为,这取决于我通过它们发送的数据的大小。 根据python文档,这些管道是基于连接的,并且应该以异步方式运行,但有时他们会在发送时卡住。如果我在每个连接中启用全双工,即使我没有使用连接进行发送和收听,一切正常。 任何人都可以解释这种行为?Python管道的同步/异步行为
- 100辆彩车,全双工禁用
代码工作,利用asynchronousness。 - 100浮动,全双工启用
该示例正常工作正常。 - 10000浮点数,全双工禁用
尽管数据量较小,但执行永远被阻止。 - 10000浮点数,全双工启用
再次罚款。
代码(这不是我的生产代码,它只是说明我的意思):
from collections import deque
from multiprocessing import Process, Pipe
from numpy.random import randn
from os import getpid
PROC_NR = 4
DATA_POINTS = 100
# DATA_POINTS = 10000
def arg_passer(pipe_in, pipe_out, list_):
my_pid = getpid()
print "{}: Before send".format(my_pid)
pipe_out.send(list_)
print "{}: After send, before recv".format(my_pid)
buf = pipe_in.recv()
print "{}: After recv".format(my_pid)
if __name__ == "__main__":
pipes = [Pipe(False) for _ in range(PROC_NR)]
# pipes = [Pipe(True) for _ in range(PROC_NR)]
pipes_in = deque(p[0] for p in pipes)
pipes_out = deque(p[1] for p in pipes)
pipes_in.rotate(1)
pipes_out.rotate(-1)
data = [randn(DATA_POINTS) for foo in xrange(PROC_NR)]
processes = [Process(target=arg_passer, args=(pipes_in[foo], pipes_out[foo], data[foo]))
for foo in xrange(PROC_NR)]
for proc in processes:
proc.start()
for proc in processes:
proc.join()
谢谢您的详细解答。现在我明白了这个问题。 – Michal 2013-04-29 06:25:54