4
我正在使用Celery运行数千个任务组, ,其中每个任务需要几分钟的时间才能运行。下面 的代码是multiprocessing.pool.Pool.map
我简单的直接替代:工作人员死后,重新启动组中的长时间任务
def map(task, data):
"""
Perform the *task* on *data* in distributed way. Blocks until finished.
"""
ret = celery_module.group(task.s(val) for val in data).apply_async()
return ret.get(interval = 0.1)
这只要工人从来没有打破就像一个魅力。 但是有时候会发生一个节点死掉的情况,需要执行几个正在运行的任务。 然后会发生什么是所有其他任务完成,工人变得闲置, ,但get
永远等待死亡工人的结果。
如何使某些超时后死的任务重试? 任务是幂等的,我根本不担心重复执行。 我试图玩弄CELERY_ACKS_LATE
并在这里和那里放超时, 但似乎没有任何补救措施。 我觉得我错过了明显的东西,但找不到什么。
编辑:代理和结果使用的传输是Redis。
我不确定是否重新运行计算需要花费几个小时处理大约一百个内核,因为需要几分钟的操作失败实际上是“正确的行为”... – lRem