2011-12-15 31 views
1

我想用芹菜来管理任务。 我现在遇到的问题是,我有很多次要任务(电子邮件,跨服务器帖子等) 和时间消耗任务,如文件上传。芹菜,一个再见一个队列和并行许多在队列中

有没有什么方法来指定上传总是会一个接一个的。只有一项任务能够及时执行,而其他工作人员则会在其他队列中工作?

回答

0

序列化任务执行的有效方法是使用互斥(互斥)。

Python的threading模块有a Lock object可以是used to this effect

# ... 
module_lock = threading.Lock() # or make this an attribute in an object with sufficiently-large scope 
# ... 
def do_interesting_task(): 
    with module_lock.acquire(): 
     interesting_task() 

“放弃所有希望,你们谁在此处输入”。

互斥锁和信号量是强大的工具,但不明智地使用它们会产生僵局,偶尔会吃掉你的午餐。

+3

我不认为这会同时工作的工人调节至2/3 /任何其他数量的并行限制。每个工人都会有全新的module_lock,或者我错了?无论如何,现在张贴我解决这个问题的方式。会很高兴听到它的意见,也许它有一些严重的缺乏,我没有看到第一种方法 – Tigra 2011-12-16 11:26:11

0

同时我已经实施了这样的解决方案,效果很好。 但是,我不太确定,max_retries = None表示将有无限次的重试次数。 该解决方案适用于redis,但可以在支持atomicaly增量操作的任何其他引擎上工作。

@task(max_retries=None,default_retry_delay=3) 
def sleepTask(): 
    if r.incr('sleep_working')>1: 
     r.incr('sleep_working',-1) 
     sleepTask.retry() 
    else: 
     try: 
      r.expire('sleep_working',3600) 
      sleep(30) 
     finally: 
      r.incr('sleep_working',-1) 
     return True 

这里的关键是,增量是原子,所以它永远不会发生,这两个客户端接收计数器== 1。

也到期是非常重要的,任何事情都可能发生,我们将永远得到我们的计数器> 1,所以到期确认,无论如何,经过特定的时间计数器将被丢弃。这个值可以根据需要进行调整。我的大文件上传,所以3600听起来不错。

我认为这是一个很好的起点,通过接收redis_key和expire_time值来创建自定义Task对象,它将自动执行所有这些操作。我会更新这个帖子,如果我会做这样的任务。

作为奖励,这种解决方案也可以容易地通过改变> 1至>任何数量