我想用芹菜来管理任务。 我现在遇到的问题是,我有很多次要任务(电子邮件,跨服务器帖子等) 和时间消耗任务,如文件上传。芹菜,一个再见一个队列和并行许多在队列中
有没有什么方法来指定上传总是会一个接一个的。只有一项任务能够及时执行,而其他工作人员则会在其他队列中工作?
我想用芹菜来管理任务。 我现在遇到的问题是,我有很多次要任务(电子邮件,跨服务器帖子等) 和时间消耗任务,如文件上传。芹菜,一个再见一个队列和并行许多在队列中
有没有什么方法来指定上传总是会一个接一个的。只有一项任务能够及时执行,而其他工作人员则会在其他队列中工作?
序列化任务执行的有效方法是使用互斥(互斥)。
Python的threading
模块有a Lock
object可以是used to this effect:
# ...
module_lock = threading.Lock() # or make this an attribute in an object with sufficiently-large scope
# ...
def do_interesting_task():
with module_lock.acquire():
interesting_task()
“放弃所有希望,你们谁在此处输入”。
互斥锁和信号量是强大的工具,但不明智地使用它们会产生僵局,偶尔会吃掉你的午餐。
同时我已经实施了这样的解决方案,效果很好。 但是,我不太确定,max_retries = None表示将有无限次的重试次数。 该解决方案适用于redis,但可以在支持atomicaly增量操作的任何其他引擎上工作。
@task(max_retries=None,default_retry_delay=3)
def sleepTask():
if r.incr('sleep_working')>1:
r.incr('sleep_working',-1)
sleepTask.retry()
else:
try:
r.expire('sleep_working',3600)
sleep(30)
finally:
r.incr('sleep_working',-1)
return True
这里的关键是,增量是原子,所以它永远不会发生,这两个客户端接收计数器== 1。
也到期是非常重要的,任何事情都可能发生,我们将永远得到我们的计数器> 1,所以到期确认,无论如何,经过特定的时间计数器将被丢弃。这个值可以根据需要进行调整。我的大文件上传,所以3600听起来不错。
我认为这是一个很好的起点,通过接收redis_key和expire_time值来创建自定义Task对象,它将自动执行所有这些操作。我会更新这个帖子,如果我会做这样的任务。
作为奖励,这种解决方案也可以容易地通过改变> 1至>任何数量
我不认为这会同时工作的工人调节至2/3 /任何其他数量的并行限制。每个工人都会有全新的module_lock,或者我错了?无论如何,现在张贴我解决这个问题的方式。会很高兴听到它的意见,也许它有一些严重的缺乏,我没有看到第一种方法 – Tigra 2011-12-16 11:26:11