2012-11-09 28 views
4

我正在使用Celery运行数千个任务组, ,其中每个任务需要几分钟的时间才能运行。下面 的代码是multiprocessing.pool.Pool.map我简单的直接替代:工作人员死后,重新启动组中的长时间任务

def map(task, data): 
    """ 
    Perform the *task* on *data* in distributed way. Blocks until finished. 
    """ 
    ret = celery_module.group(task.s(val) for val in data).apply_async() 
    return ret.get(interval = 0.1) 

这只要工人从来没有打破就像一个魅力。 但是有时候会发生一个节点死掉的情况,需要执行几个正在运行的任务。 然后会发生什么是所有其他任务完成,工人变得闲置, ,但get永远等待死亡工人的结果。

如何使某些超时后死的任务重试? 任务是幂等的,我根本不担心重复执行。 我试图玩弄CELERY_ACKS_LATE并在这里和那里放超时, 但似乎没有任何补救措施。 我觉得我错过了明显的东西,但找不到什么。

编辑:代理和结果使用的传输是Redis。

回答

0

这里的正确行为是设置超时以及何时死亡重试整个map任务。

+1

我不确定是否重新运行计算需要花费几个小时处理大约一百个内核,因为需要几分钟的操作失败实际上是“正确的行为”... – lRem

相关问题