2013-04-21 117 views
7

在我的应用程序中,我使用多处理模块中的管道在python进程之间进行通信。 最近我观察到一个奇怪的行为,这取决于我通过它们发送的数据的大小。 根据python文档,这些管道是基于连接的,并且应该以异步方式运行,但有时他们会在发送时卡住。如果我在每个连接中启用全双工,即使我没有使用连接进行发送和收听,一切正常。 任何人都可以解释这种行为?Python管道的同步/异步行为

  1. 100辆彩车,全双工禁用
    代码工作,利用asynchronousness。
  2. 100浮动,全双工启用
    该示例正常工作正常。
  3. 10000浮点数,全双工禁用
    尽管数据量较小,但执行永远被阻止。
  4. 10000浮点数,全双工启用
    再次罚款。

代码(这不是我的生产代码,它只是说明我的意思):

from collections import deque 
from multiprocessing import Process, Pipe 
from numpy.random import randn 
from os import getpid 

PROC_NR = 4 
DATA_POINTS = 100 
# DATA_POINTS = 10000 


def arg_passer(pipe_in, pipe_out, list_): 
    my_pid = getpid() 
    print "{}: Before send".format(my_pid) 
    pipe_out.send(list_) 
    print "{}: After send, before recv".format(my_pid) 
    buf = pipe_in.recv() 
    print "{}: After recv".format(my_pid) 


if __name__ == "__main__": 
    pipes = [Pipe(False) for _ in range(PROC_NR)] 
    # pipes = [Pipe(True) for _ in range(PROC_NR)] 
    pipes_in = deque(p[0] for p in pipes) 
    pipes_out = deque(p[1] for p in pipes) 
    pipes_in.rotate(1) 
    pipes_out.rotate(-1) 

    data = [randn(DATA_POINTS) for foo in xrange(PROC_NR)] 
    processes = [Process(target=arg_passer, args=(pipes_in[foo], pipes_out[foo], data[foo])) 
       for foo in xrange(PROC_NR)] 

    for proc in processes: 
     proc.start() 

    for proc in processes: 
     proc.join() 

回答

8

首先,值得注意的multiprocessing.Pipe类的实现......

def Pipe(duplex=True): 
    ''' 
    Returns pair of connection objects at either end of a pipe 
    ''' 
    if duplex: 
     s1, s2 = socket.socketpair() 
     s1.setblocking(True) 
     s2.setblocking(True) 
     c1 = _multiprocessing.Connection(os.dup(s1.fileno())) 
     c2 = _multiprocessing.Connection(os.dup(s2.fileno())) 
     s1.close() 
     s2.close() 
    else: 
     fd1, fd2 = os.pipe() 
     c1 = _multiprocessing.Connection(fd1, writable=False) 
     c2 = _multiprocessing.Connection(fd2, readable=False) 

    return c1, c2 

区别在于半双工'管道'使用anonymous pipe,但全双工'管道'实际上使用Unix domain socket,因为匿名管道本质上是单向的。

我不确定在这种情况下术语“异步”是什么意思。如果你的意思是“非阻塞I/O”,那么值得注意的是这两个实现默认都使用阻塞I/O。


其次,值得注意你想发送的数据的大小腌制...

>>> from numpy.random import randn 
>>> from cPickle import dumps 
>>> len(dumps(randn(100))) 
2479 
>>> len(dumps(randn(10000))) 
237154 

第三,从pipe(7)手册页...

管道容量

管道容量有限。如果管道已满,则写入(2)将阻止 或失败,具体取决于是否设置了O_NONBLOCK标志(请参见下文)。对于管道容量,不同的实施方式有不同的限制。应用程序应该不依赖于特定的容量:应该设计应用程序,以便 读取过程在数据可用时立即使用数据,以便写入过程 不会被阻塞。

在2.6.11之前的Linux版本中,管道的容量与系统页面大小 (例如i386上的4096字节)相同。自Linux 2.6.11以来,管道容量为 65536字节。


所以,实际上,你已经创建,所有的子进程都挡在了pipe_out.send()呼叫的僵局,没有人可以从其他进程接收到任何数据,因为你发送的所有一次命中237,154个字节的数据,填充了65,536字节的缓冲区。

您可能会试图使用Unix域套接字版本,但目前它的唯一原因是它的缓冲区大小超过管道,并且您会发现如果您增加该解决方案也会失败DATA_POINTS的数量为100,000。

“quick n'dirty hack”解决方案是将数据分成更小的块进行发送,但依赖于特定大小的缓冲区并不是一个好习惯。

更好的解决方案是在调用pipe_out.send()时使用非阻塞I/O,但我对multiprocessing模块不熟悉,无法确定使用该模块实现它的最佳方式。

中的伪是沿行...

while 1: 
    if we have sent all data and received all data: 
     break 
    send as much data as we can without blocking 
    receive as much data as we can without blocking 
    if we didn't send or receive anything in this iteration: 
     sleep for a bit so we don't waste CPU time 
     continue 

...或者你可以使用Python select模块,以避免睡眠时间超过是必要的,但同样,有multiprocessing.Pipe它整合可能会很棘手。

有可能multiprocessing.Queue类为你做这一切,但我以前从未使用它,所以你必须做一些实验。

+0

谢谢您的详细解答。现在我明白了这个问题。 – Michal 2013-04-29 06:25:54