9

之间的对象有三个问题作为可能的重复(但太具体):多重分享不可序列化进程

通过回答这个问题,其他三个问题可以回答。 希望我说清楚:

当我创建通过创建多处理一些过程中的对象:

  1. 我如何通过一个参考该对象到其他进程?
  2. (不是那么重要)我如何确定这个过程在我参考时不会死?

实施例1(解决)

from concurrent.futures import * 

def f(v): 
    return lambda: v * v 

if __name__ == '__main__': 
    with ThreadPoolExecutor(1) as e: # works with ThreadPoolExecutor 
     l = list(e.map(f, [1,2,3,4])) 
    print([g() for g in l]) # [1, 4, 9, 16] 

实施例2

假设f返回与可变状态的对象。这个相同的对象应该可以从其他进程访问。

例3

我已经拥有一个打开的文件和锁定的对象 - 我怎么授予访问其他进程?

提醒

我不希望不会出现此特定错误。或者这个特定用例的解决方案。解决方案应该足够普遍,以便在进程之间共享不可移动的对象。这些对象可以在任何进程中创建。使所有物体可移动并保持身份的解决方案也可以很好。

欢迎提供任何提示,任何指向如何实施解决方案的部分解决方案或代码片段都是值得的。所以我们可以一起创建解决方案。

这里是一个尝试解决这个,但没有多:https://github.com/niccokunzmann/pynet/blob/master/documentation/done/tools.rst

问题

What you want the other processes to do with the references?

的引用可以通过与多创建的任何其它工艺(重复3)。可以访问属性,调用引用。访问attibutes可能或可能不是代理。

What's the problem with just using a proxy?

也许有没有问题,但一个挑战。我的印象是一个代理有一个管理器,并且一个管理器有它自己的进程,所以这个不可序列化的对象必须被序列化和转移(部分用StacklessPython/fork解决)。 也存在特殊对象的代理 - 为所有对象构建代理(可解决)很难但并非不可能。

解决方案? - 代理+经理?

Eric Urban表明序列化不是问题。真正的挑战在于Example 3 & 3:状态的同步。我对解决方案的想法是为经理创建一个特殊的代理类。该代理类

  1. 花费不可序列化对象constuctor
  2. 将可序列化对象,并将其传输到管理器进程。
  3. (问题)根据1.必须在管理器进程中创建不可序列化的对象。
+1

应编辑该问题以解释您希望其他过程如何处理参考。只有将它们传回原始过程? –

+0

编辑它。告诉我,如果这不能回答这个问题,谢谢。 – User

+0

只使用代理有什么问题? – Kritzefitz

回答

9

大多数情况下,将现有对象的引用传递给另一个进程并不是真的需要。相反,你创建你的类,你要在进程间共享:

class MySharedClass: 
    # stuff... 

然后你做代理的经理是这样的:

import multiprocessing.managers as m 
class MyManager(m.BaseManager): 
    pass # Pass is really enough. Nothing needs to be done here. 

然后你对管理器注册类,像这样:

MyManager.register("MySharedClass", MySharedClass) 

然后,一旦管理员立即启动,并使用manager.start(),您可以使用manager.MySharedClass创建您的班级的共享实例。这应该适用于所有需求。返回的代理服务器与原始对象完全相同,但documentation中描述的一些例外情况除外。

+0

这太棒了!我测试了它,它工作得很好。 http://codepad.org/zW2LU6XV还有并发问题。但这些都没问题。 – User

+0

虽然这并不能解决问题。我用这段代码作为MySharedClass的模板,它有一个(模拟)数据库游标。如果我尝试在MySharedClass方法中返回它,我会收到Unserializable Message错误。 – sinwav

+0

@sinwav据我所知,在进程之间共享数据库光标是不可能的。无论您在两个进程之间使用何种传输机制,在某些时候该对象需要以某种方式进行序列化。 Python为此使用酸洗。如果你不能腌一些东西,那就有其原因。使用数据库游标的问题是,游标只在它创建的连接上有效,但该连接仅在一个进程中打开。这表明数据库游标仅在创建它的过程中有效。这里没有可能的分享。 – Kritzefitz

3

只需使用stackless python即可。你可以用pickle序列化几乎任何东西,包括函数。在这里,我使用pickle模块序列化并反序列化lambda。这与您在示例中尝试执行的操作类似。

这里是下载链接Stackless Python才能http://www.stackless.com/wiki/Download

Python 2.7.5 Stackless 3.1b3 060516 (default, Sep 23 2013, 20:17:03) 
[GCC 4.6.3] on linux2 
Type "help", "copyright", "credits" or "license" for more information. 
>>> f = 5 
>>> g = lambda : f * f 
>>> g() 
25 
>>> import pickle 
>>> p = pickle.dumps(g) 
>>> m = pickle.loads(p) 
>>> m() 
25 
>>> 
+0

+1这很好,但1.确实是保留身份m是g和2.如果我序列化函数并在其他进程中反序列化它,它会在原始进程中调用吗? - 没有。但是,如果在关闭进程时需要保存该函数,这绝对是一个很好的解决方案。 – User

4

阅读这个答案之前,请注意,该解决方案解释它是可怕的。请在答案结尾处注意警告。

我找到了一种通过multiprocessing.Array共享对象状态的方法。 所以我做了这个类,透明地通过所有的进程共享它的状态:

import multiprocessing as m 
import pickle 

class Store: 
    pass 

class Shareable: 
    def __init__(self, size = 2**10): 
     object.__setattr__(self, 'store', m.Array('B', size)) 
     o = Store() # This object will hold all shared values 
     s = pickle.dumps(o) 
     store(object.__getattribute__(self, 'store'), s) 

    def __getattr__(self, name): 
     s = load(object.__getattribute__(self, 'store')) 
     o = pickle.loads(s) 
     return getattr(o, name) 

    def __setattr__(self, name, value): 
     s = load(object.__getattribute__(self, 'store')) 
     o = pickle.loads(s) 
     setattr(o, name, value) 
     s = pickle.dumps(o) 
     store(object.__getattribute__(self, 'store'), s) 

def store(arr, s): 
    for i, ch in enumerate(s): 
     arr[i] = ch 

def load(arr): 
    l = arr[:] 
    return bytes(arr) 

你可以通过这个类的实例(和它的子类)到任何其他的过程,它会同步这是国家通过的所有进程。 这与该代码进行测试:

class Foo(Shareable): 
    def __init__(self): 
     super().__init__() 
     self.f = 1 

    def foo(self): 
     self.f += 1 

def f(s): 
    s.f += 1 

if __name__ == '__main__': 
    import multiprocessing as m 
    import time 
    s = Foo() 
    print(s.f) 
    p = m.Process(target=f, args=(s,)) 
    p.start() 
    time.sleep(1) 
    print(s.f) 

“神奇”的这个类是它存储在类Store的另一个实例这一切属性。这个班不是很特别。这只是一些可以具有任意属性的类。 (字典也会这样做。)

但是,这个类有一些非常讨厌的怪癖。我找到了两个。

第一个怪癖是你必须指定最多需要多少空间Store实例。这是因为multiprocessing.Array有一个静态大小。所以可以腌制的对象只能和数组一样大。

第二个怪癖是你不能在ProcessPoolExecutors或简单池中使用这个类。如果你尝试这样做,你会得到一个错误:

>>> s = Foo() 
>>> with ProcessPoolExecutor(1) as e: 
...  e.submit(f, args=(s,)) 
... 
<Future at 0xb70fe20c state=running> 
Traceback (most recent call last): 
<omitted> 
RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance 

警告
你可能不应该使用这种方法,因为它使用的内存不可控的量相比,使用代理过于复杂(看到我的其他答案),并可能以惊人的方式崩溃。

相关问题