2013-05-15 25 views
7

我使用64位Python 3.3.0 CPython解释器在64位Linux(内核 版本2.6.28.4)计算机上运行自定义模拟器(用于生物学)。使用管道在进程之间传输Python对象时的字节限制?

因为模拟器依赖于许多独立实验的有效结果,我建立了并行处理运行实验。 之间的通信主要发生在生产者 - 消费者模式下,具有管理 multiprocessing Queue s (doc)。 该架构的纲要如下:

  • ,处理产卵和管理Process ES和产生模拟
  • 1结果消费者过程中的各种Queue小号
  • n个工作的处理的主流程,消耗模拟结果并对结果进行分类和分析

主进程和工作进程通过输入Queue进行通信。 类似地,工作进程将其结果放置在输出Queue中,结果消费者进程消耗来自其的项目。最终ResultConsumer 对象通过multiprocessing Pipedoc) 传递回主进程。

一切工作正常,直到它试图通过Pipe传递ResultConsumer对象回 程师傅:

Traceback (most recent call last): 
    File "/home/cmccorma/.local/lib/python3.3/multiprocessing/process.py", line 258, in _bootstrap 
    self.run() 
    File "/home/cmccorma/.local/lib/python3.3/multiprocessing/process.py", line 95, in run 
    self._target(*self._args, **self._kwargs) 
    File "DomainArchitectureGenerator.py", line 93, in ResultsConsumerHandler 
    pipeConn.send(resCon) 
    File "/home/cmccorma/.local/lib/python3.3/multiprocessing/connection.py", line 207, in send 
    self._send_bytes(buf.getbuffer()) 
    File "/home/cmccorma/.local/lib/python3.3/multiprocessing/connection.py", line 394, in _send_bytes 
    self._send(struct.pack("!i", n)) 
struct.error: 'i' format requires -2147483648 <= number <= 2147483647 

我明白的前两个走线(在Process库未处理退出), 和第三是我的代码行,用于将ResultConsumer对象向 Pipe发送到主进程。最后两条痕迹是 有趣的地方。 A Pipe泡菜发送给它的任何对象,并将 产生的字节传递到另一端(匹配连接),在运行recv()时取消其中的 。 self._send_bytes(buf.getbuffer())正在尝试 发送pickle对象的字节。 self._send(struct.pack("!i", n)) is 尝试打包长度为n的整数(网络/大端)的结构, 其中n是作为参数传入的缓冲区的长度(struct 库处理Python值与表示的C结构之间的转换as Python字符串,请参阅the doc)。

只有在尝试大量实验时才会出现此错误,例如, 10个实验 不会导致它,但1000会精心(所有其他参数是恒定的)。至于为什么struct.error被抛出的原因,我最好的 假设是试图按下管道的字节数 超过2^32-1(2147483647)或〜2GB。

所以我的问题是双重的:

  1. 我卡住我的调查,因为struct.py基本上只是 从_struct进口,我不知道在哪里是。

  2. 字节限制似乎是任意的,因为底层架构全部是64位的 。那么,为什么我不能通过比这更大的东西呢?此外,如果我 无法更改此设置,是否有任何有益于此问题的解决方法?

注:我不认为在地方Pipe的使用Queue就能解决的问题,如 我怀疑Queue的使用类似的酸洗中间步骤。 编辑:这个笔记完全不正确,正如在abarnert的答案中指出的那样。

+0

如果你想要一个非常愚蠢的解决方法,你可以打开'process.py'并将该行更改为long或其他东西,但是这样你就必须在任何地方执行它,并且不知道它会破坏什么。 –

+0

@SuperDisk:'process.py'中没有相关的行,并且没有int变为long(在Python 3.x中没有_is_ no'long'类型,甚至在2.7中它们是相同的类型) 。另外,要么monkeypatch'multiprocessing'好多了,要么分叉并保留一个单独的副本,然后修改stdlib。 – abarnert

+0

@abarnert看起来像回溯线349。将'i'更改为'q'(这对结构模块来说很长)。也许我错了? –

回答

8

我陷入了我的调查,因为struct.py基本上只是从_struct导入,我不知道它在哪里。

在CPython的,_struct是从_struct.c在源树中的目录Modules构建的C扩展模块。您可以在线查找代码here

每当foo.py做一个import _foo,这几乎总是一个C扩展模块,通常建立从_foo.c。如果根本找不到foo.py,则可能是一个C扩展模块,它由_foomodule.c构建而成。

即使您不使用PyPy,也经常值得看看equivalent PyPy source。它们以纯Python重新实现了几乎所有的扩展模块 - 对于其余的(包括这种情况),底层的“扩展语言”是RPython,而不是C.

但是,在这种情况下,您不需要知道任何东西关于struct如何超越文档中的内容。鉴于底层架构是所有64位


的字节限制似乎任意的。

看它调用的代码:

self._send(struct.pack("!i", n)) 

如果你看the documentation,将'i'格式字符明确表示“4个字节的整数ç”,而不是“什么ssize_t是”。为此,您必须使用'n'。或者您可能需要明确使用很长的时间,并使用'q'

您可以用monkeypatch multiprocessing来使用struct.pack('!q', n)。或者'!q'。或以struct以外的其他方式编码长度。当然,这会破坏与未修补的multiprocessing的兼容性,如果您尝试在多台计算机上执行分布式处理或某事,这可能会造成问题。但它应该很简单:

def _send_bytes(self, buf): 
    # For wire compatibility with 3.2 and lower 
    n = len(buf) 
    self._send(struct.pack("!q", n)) # was !i 
    # The condition is necessary to avoid "broken pipe" errors 
    # when sending a 0-length buffer if the other end closed the pipe. 
    if n > 0: 
     self._send(buf) 

def _recv_bytes(self, maxsize=None): 
    buf = self._recv(8) # was 4 
    size, = struct.unpack("!q", buf.getvalue()) # was !i 
    if maxsize is not None and size > maxsize: 
     return None 
    return self._recv(size) 

当然不能保证这种改变是足够的;你需要阅读周围代码的其余部分,并测试它。


注:我怀疑代替Pipe的使用Queue解决不了问题,因为我怀疑Queue的使用类似的酸洗中间步骤。

那么,这个问题与酸洗没有关系。 Pipe未使用pickle发送长度,它使用struct。您可以验证pickle不会有这个问题:pickle.loads(pickle.dumps(1<<100)) == 1<<100将返回True

(在早期版本中,pickle有巨大的物体 - 例如,一个list 2G的元素,这可能在约8倍高,您目前正在打一个规模引起的问题的。但是,这是问题被修复了3.3)。

同时...只是尝试一下,看看,而不是通过挖掘来源,试图找出它是否会工作?


另外,你确定你真的想通过隐式酸洗传递2GB数据结构吗?

如果我正在做一些缓慢且内存不足的事情,我宁愿将这些明确的例如pickle放到临时文件并发送路径或fd。 (如果您使用numpypandas什么的,使用它的二进制文件格式,而不是pickle,但同样的想法。)

,或者甚至更好,共享数据。是的,可变的共享状态不好...但分享不可改变对象是好的。无论你有2GB的内存,你可以把它放在一个multiprocessing.Array,或者把它放在一个​​数组或结构体(数组或结构体)中,你可以通过multiprocessing.sharedctypes或​​分享它,file你可以分享mmap双方,还是......?有一些额外的代码来定义和挑选结构,但是如果好处可能很大,那么值得尝试。


最后,当你认为你已经找到了在Python中的错误/显失公平功能/不合理的限制,这是值得期待的bug跟踪系统。它看起来像issue 17560: problem using multiprocessing with really big objects?正是您的问题,并且有很多信息,包括建议的解决方法。

+0

哇,非常感谢您提供了非常全面的答案!为了解决您的一些观点:传递2GB数据结构的问题不是一个问题,直到最近还有一个新的模拟模型,它将大量的数据吐出,看起来我需要重新设计内部结构,以更明确地进行序列化和数据传递,或者将所需的逻辑从主进程移动到结果过程并完全消除这个问题,但我没有甚至考虑看看bug跟踪器,所以也要感谢! –

+0

@CollinM:这听起来像是现在增加解决方法是有意义的,并且开始着手研究改变内部结构(不管是少量数据,还是以不同方式分享数据)。希望能帮助到你。 – abarnert

相关问题