2014-09-29 49 views
0

我正在开发Python中的一个小型irc客户端(版本2.7)。我希望利用多从我目前连接到所有的服务器阅读,但我遇到了一个问题,使用多处理从多个套接字获取信息

import socket 
import multiprocessing as mp 
import types 
import copy_reg 
import pickle 


def _pickle_method(method): 
    func_name = method.im_func.__name__ 
    obj = method.im_self 
    cls = method.im_class 
    return _unpickle_method, (func_name, obj, cls) 

def _unpickle_method(func_name, obj, cls): 
    for cls in cls.mro(): 
     try: 
      func = cls.__dict__[func_name] 
     except KeyError: 
      pass 
     else: 
      break 
    return func.__get__(obj, cls) 

copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method) 

class a(object): 

    def __init__(self): 
     sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
     sock1.connect((socket.gethostbyname("example.com"), 6667)) 
     self.servers = {} 
     self.servers["example.com"] = sock1 

    def method(self, hostname): 
     self.servers[hostname].send("JOIN DAN\r\n") 
     print "1" 

    def oth_method(self): 
     pool = mp.Pool() 
     ## pickle.dumps(self.method) 
     pool.map(self.method, self.servers.keys()) 
     pool.close() 
     pool.join() 

if __name__ == "__main__": 
    b = a() 
    b.oth_method() 

当它击中线pool.map(self.method, self.servers.keys())我得到的错误

TypeError: expected string or Unicode object, NoneType found 

从我读过的内容来看,当我尝试腌制不可挑剔的东西时会发生什么。为了解决这个问题,我首先制作了_pickle_method_unpickle_method,如here所述。然后我意识到我(最初)试图通过pool.map()套接字列表(非常不可用),因此我将其更改为主机名列表,因为可以对字符串进行酸洗。不过,我仍然遇到这个错误。

然后我试着直接拨打pickle.dumps()self.method,self.servers.keys()self.servers.keys()[0]。正如预期的那样,它的工作罚款后两者,但是从我第一次拿到

TypeError: a class that defines __slots__ without defining __getstate__ cannot be pickled. 

一些更多的研究,导致我this question,这似乎表明,这个问题与使用插座(和gnibbler's answer这个问题似乎证实了这一点)。

有没有一种方法可以实际为此使用多处理?从我已经(非常简要地)阅读pathos.multiprocessing可能是我需要的,但我真的很想坚持标准库,如果可能的话。

我也没有设置使用多处理 - 如果多线程会更好地工作,并避免这个问题,那么我比这些解决方案更开放。

+0

你是否真的试图将一个套接字传递给子进程,还是只是你试图避免的偶然发生的事情?对于前者,你需要迁移套接字,这必须在比Python酸洗更低的层次上完成,并且对于每个平台都是不同的,因为在封面之下,套接字只是文件描述符的包装,而你需要操作系统使相同的文件描述符意味着您的子进程中使用相同的套接字。 – abarnert 2014-09-29 08:20:39

+0

同时,您是否有理由在第一时间使用多处理而不是多线程呢?“从一堆服务器中读取”就像你可以得到I/O约束的范例一样,这正是线程的优点。 – abarnert 2014-09-29 08:21:50

+0

不,我将子进程的字符串键传递给引用套接字的字典。子进程然后使用字符串键来访问套接字,做套接字的东西,然后返回。我使用多处理而不是多线程的原因是因为我是多新的东西,而且我读到python中的线程速度很慢。话虽如此,我对多线程解决方案非常开放 – Dannnno 2014-09-29 08:22:41

回答

3

您的根本问题是您无法将套接字传递给子进程。简单的解决方案是使用线程代替。

更详细地:


酸洗绑定方法需要酸洗三两件事:函数名,对象和类。 (我认为multiprocessing会自动为你做这件事,但是你手动做,这很好。)要腌制对象,你必须腌制它的成员,在你的情况下,它包括一个字典,其值是套接字。

您不能在Python 2.x中使用默认酸洗协议来使用套接字。链接问题的答案解释了原因,并提供了简单的解决方法:不要使用默认的酸洗协议。但socket还有一个额外的问题;它只是一个围绕在C扩展模块中定义的类型的包装,它在酸洗时有它自己的问题。你也许能够解决这个问题...

但是,这仍然没有帮助。在封面之下,C扩展类本身就是一个文件描述符的包装。文件描述符只是一个数字。您的操作系统将文件描述符映射为打开每个进程的套接字(以及文件和管道等);一个进程中的文件#4不是另一个进程中的文件#4。因此,您需要实际将套接字的文件描述符迁移到操作系统级别的子级。这不是一件简单的事情,它在每个平台上都有所不同。而且,当然,除了迁移文件描述符之外,还必须传递足够的信息来重新构造socket对象。所有这些都是可行的。甚至可能会有一个图书馆为你包装它。但这并不容易。


另一种可能性是在启动任何孩子之前打开所有的套接字,并将它们设置为由孩子们继承。但是,即使你可以重新设计你的代码来完成这些事情,这只能在POSIX系统上运行,而不能在Windows上运行。


更简单的可能性是只使用线程而不是进程。如果你正在做CPU绑定的工作,线程在Python中有问题(CPython,你几乎可以肯定使用的实现),因为全局解释器锁可以防止两个线程同时解释代码。但是当你的线程花费所有时间在socket.recv和类似的I/O调用上等待时,使用线程没有问题。他们避免了酸洗数据和迁移套接字等所有开销和复杂性。

您可能会注意到threading模块没有很好的Pool类,例如multiprocessing。但令人惊讶的是, stdlib中的一个线程池类 - 它只是在multiprocessing中。您可以访问它作为multiprocessing.dummy.Pool

如果你愿意超越stdlib,Python 3的concurrent.futures模块有一个名为futures的backport,你可以在PyPI上安装它。它包括一个ThreadPoolExecutor这是一个稍微更高层次的抽象,可能更容易使用。但Pool也应该在这里适合你,并且你已经编写了代码。

+0

谢谢,这是有道理的。我会在早上看看它,看看我能不能让它工作 – Dannnno 2014-09-29 08:41:27

1

如果想尝试跳出标准库,那么对于pathos.multiprocessing下面的代码(你提到)不应该抛出酸洗错误,因为dill串行知道如何序列套接字和文件句柄。然而

>>> import socket 
>>> import pathos.multiprocessing as mp 
>>> import types 
>>> import dill as pickle 
>>> 
>>> class a(object): 
... def __init__(self): 
...  sock1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
...  sock1.connect((socket.gethostbyname("example.com"), 6667)) 
...  self.servers = {} 
...  self.servers["example.com"] = sock1 
... def method(self, hostname): 
...  self.servers[hostname].send("JOIN DAN\r\n") 
...  print "1" 
... def oth_method(self): 
...  pool = mp.ProcessingPool() 
...  pool.map(self.method, self.servers.keys()) 
...  pool.close() 
...  pool.join() 
... 
>>> b = a() 
>>> b.oth_method() 

的一个问题是,你需要用系列化multiprocessing,而且在许多情况下,插座会序列,使反序列化的套接字被关闭。原因主要是因为文件描述符未按预期复制,所以通过引用复制。使用dill,您可以自定义文件句柄的序列化,以便传输内容而不是使用引用...但是,这对于套接字(至少在此刻)没有很好的转换。

我是dillpathos作家,我有,你可能不想与multiprocessing(至少不是存储地图服务器,插座)要做到这一点@abarnert同意。如果您想使用multiprocessing's线程接口,并且您发现遇到任何序列化问题,pathos.multiprocessing确实有mp.ThreadingPool()而不是mp.ProcessingPool(),因此您可以访问multiprocessing.dummy.Pool的包装,但仍可获得pathos提供的附加功能(例如多阻塞或异步管道和映射的参数池等)。

+0

我很好奇你是如何序列化文件句柄的。它不像'unix_sock.sendmsg(sock)'和'sock.share(pid)'完全难以编写,但将它们包装在进程池接口中似乎有点让人头疼。 (另外,在Windows上处理套接字和文件是不同的事情......)但是这可能是在这里继续的话题,所以我会去下载你的模块并读取它。 :) – abarnert 2014-09-29 19:44:41

+0

@abarnert:没有什么聪明的插座了。有几个选项(最近添加在https://github.com/uqfoundation/dill上,而不是在当前版本中)用于处理序列化文件......并且它需要在Windows上进行测试。我知道Windows和任何其他操作系统之间的区别。 'multiprocessing'的分支非常简单,我只需替换序列化程序并在顶部添加一个小小的说服层,例如,允许'map'采用多个参数。 – 2014-09-29 22:42:48