我想与只能接受单个TCP连接(内存约束)的设备进行接口,因此只需为每个工作线程启动连接都不是一个选项,因为它具有正常客户端 - 服务器情况,如数据库连接。在Celery任务中使用多处理并发机制
我已经使用了多重经理字典就是线程之间全局访问尝试,格式为:
clients{(address, port): (connection_obj, multiprocessing.Manager.RLock)}
而像这样一个任务:
from celery import shared_task
from .celery import manager, clients
@shared_task
def send_command(controller, commandname, args):
"""Send a command to the controller."""
# Create client connection if one does not exist.
conn = None
addr, port = controller
if controller not in clients:
conn = Client(addr, port)
conn.connect()
lock = manager.RLock()
clients[controller] = (conn, lock,)
print("New controller connection to %s:%s" % (addr, port,))
else:
conn, lock = clients[controller]
try:
f = getattr(conn, commandname) # See if connection.commandname() exists.
except Exception:
raise Exception("command: %s not known." % (commandname))
with lock:
res = f(*args)
return res
但是任务会出现序列化错误,例如:
_pickle.PicklingError: Can't pickle <class '_thread.lock'>: attribute lookup lock on _thread failed
即使我没有使用不可序列化的值调用任务,并且任务没有尝试返回一个不可序列化的值Celery似乎着迷于试图序列化这个全局对象?
我缺少什么?你将如何在Celery任务中使用线程安全的客户端设备连接并在线程之间访问?示例代码?
我不确定这是否适合您的情况,但我只记得阅读'multiprocessing.reduction',它应该允许在进程之间共享套接字连接。 [见这篇博文的例子](http://foobarnbaz.com/2011/08/30/developing-scalable-services-with-python/)。 – antikantian
客户端不使用原始套接字,它是具有协议的Twisted连接对象。使用原始套接字或重新构建fd中的Twisted连接对象并不重要。 –
我最终研究了如何在现有套接字周围包装一个Twisted协议,但是在我的情况下它不起作用,因为Celery消费者作为*工作主进程的独立子进程*无法访问需要的文件描述符(存储在Redis中),并建立unix管道的共享FD的纠纷是太多hackery。 与我的情况有关的问题是设备是内存受限,并且不能只是有多个连接...所以我决定只有一个工作人员与一个消费者和一个设备。不太好! –