2016-03-24 31 views
1

我想与只能接受单个TCP连接(内存约束)的设备进行接口,因此只需为每个工作线程启动连接都不是一个选项,因为它具有正常客户端 - 服务器情况,如数据库连接。在Celery任务中使用多处理并发机制

我已经使用了多重经理字典就是线程之间全局访问尝试,格式为:

clients{(address, port): (connection_obj, multiprocessing.Manager.RLock)}

而像这样一个任务:

from celery import shared_task 
from .celery import manager, clients 

@shared_task 
def send_command(controller, commandname, args): 
    """Send a command to the controller.""" 
    # Create client connection if one does not exist. 
    conn = None 
    addr, port = controller 
    if controller not in clients: 
     conn = Client(addr, port) 
     conn.connect() 
     lock = manager.RLock() 
     clients[controller] = (conn, lock,) 
     print("New controller connection to %s:%s" % (addr, port,)) 
    else: 
     conn, lock = clients[controller] 

    try: 
     f = getattr(conn, commandname) # See if connection.commandname() exists. 
    except Exception: 
     raise Exception("command: %s not known." % (commandname)) 

    with lock: 
     res = f(*args) 
     return res 

但是任务会出现序列化错误,例如:

_pickle.PicklingError: Can't pickle <class '_thread.lock'>: attribute lookup lock on _thread failed

即使我没有使用不可序列化的值调用任务,并且任务没有尝试返回一个不可序列化的值Celery似乎着迷于试图序列化这个全局对象?

我缺少什么?你将如何在Celery任务中使用线程安全的客户端设备连接并在线程之间访问?示例代码?

+0

我不确定这是否适合您的情况,但我只记得阅读'multiprocessing.reduction',它应该允许在进程之间共享套接字连接。 [见这篇博文的例子](http://foobarnbaz.com/2011/08/30/developing-scalable-services-with-python/)。 – antikantian

+0

客户端不使用原始套接字,它是具有协议的Twisted连接对象。使用原始套接字或重新构建fd中的Twisted连接对象并不重要。 –

+0

我最终研究了如何在现有套接字周围包装一个Twisted协议,但是在我的情况下它不起作用,因为Celery消费者作为*工作主进程的独立子进程*无法访问需要的文件描述符(存储在Redis中),并建立unix管道的共享FD的纠纷是太多hackery。 与我的情况有关的问题是设备是内存受限,并且不能只是有多个连接...所以我决定只有一个工作人员与一个消费者和一个设备。不太好! –

回答

0
... 
self._send_bytes(ForkingPickler.dumps(obj)) 
File "/usr/lib64/python3.4/multiprocessing/reduction.py", line 50, in dumps 
cls(buf, protocol).dump(obj) 
_pickle.PicklingError: Can't pickle <class '_thread.lock'>: attribute lookup lock on _thread failed 

环顾网络后,我意识到我可能错过了追踪中重要的东西。看了回溯后,我意识到这不是Celery试图腌制连接对象,而是Multiprocessing.reduction。减量用于连续化并在另一侧重新构建。

我有几个替代方法来解决这个问题 - 但是他们没有真正做我最初想要的,就是借用客户端库连接对象并使用它,这对于多处理和prefork是不可能的。

+0

啊,我想我的回答有点仓促,因为你想传递相同的连接对象。 Mutliprocessing和prefork通常不会很好地处理进程之间的连接和I/O。你通常想建立一个连接post-fork。您是否考虑从prefork切换到eventlet或gevent以实现并发性,然后实现连接池? – antikantian

0

如何使用Redis实现分布式锁管理器? Redis python客户端具有内置的锁定功能。另请参阅redis.io上的this doc。即使您使用RabbitMQ或其他经纪商,Redis也非常轻巧。

例如,作为装饰:

from functools import wraps 

def device_lock(block=True): 
    def decorator(func): 
     @wraps(func) 
     def wrapper(*args, **kwargs): 
      return_value = None 
      have_lock = False 
      lock = redisconn.lock('locks.device', timeout=2, sleep=0.01) 
      try: 
       have_lock = lock.acquire(blocking=block) 
       if have_lock: 
        return_value = func(*args, **kwargs) 
      finally: 
       if have_lock: 
        lock.release() 
      return return_value 
     return wrapper 
    return decorator 

@shared_task 
@device_lock 
def send_command(controller, commandname, args): 
    """Send a command to the controller.""" 
    ... 

您还可以使用this approach从芹菜任务食谱:

from celery import task 
from celery.utils.log import get_task_logger 
from django.core.cache import cache 
from hashlib import md5 
from djangofeeds.models import Feed 

logger = get_task_logger(__name__) 

LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes 

@task(bind=True) 
def import_feed(self, feed_url): 
    # The cache key consists of the task name and the MD5 digest 
    # of the feed URL. 
    feed_url_hexdigest = md5(feed_url).hexdigest() 
    lock_id = '{0}-lock-{1}'.format(self.name, feed_url_hexdigest) 

    # cache.add fails if the key already exists 
    acquire_lock = lambda: cache.add(lock_id, 'true', LOCK_EXPIRE) 
    # memcache delete is very slow, but we have to use it to take 
    # advantage of using add() for atomic locking 
    release_lock = lambda: cache.delete(lock_id) 

    logger.debug('Importing feed: %s', feed_url) 
    if acquire_lock(): 
     try: 
      feed = Feed.objects.import_feed(feed_url) 
     finally: 
      release_lock() 
     return feed.url 

    logger.debug(
     'Feed %s is already being imported by another worker', feed_url) 
+0

我知道这些解决方案 - 但是我没有使用它的原因是因为它没有做我想要的,只是在进程之间共享实际的连接对象并使用已经打开的连接。我试图避免每次运行任务时断开连接并重新连接。 我可以保持连接对象为全局的,如果我用一个单独的线程运行worker并重用它。我正在考虑为这些客户使用一批单一流程的员工。否则,如果我确实选择仅在每次发送消息时连接,那么我将使用Redis进行锁定。在其他解决方案... –

0

你试图使用GEVENT或eventlet芹菜工人,而不是过程和线程?在这种情况下,您将能够使用全局变量或threading.local()来共享连接对象。

+0

我正在使用eventlet锁定。我可以更努力地找出原因,但是由于阻止IO性质,我尝试做的事情对于eventlet/gevent的事件循环本质是不合适的。 –