2013-10-16 26 views
2

我在我的数据库中保留对链的引用。python芹菜:如何将任务追加到旧链中

from tasks import t1, t2, t3 
from celery import chain 
res = chain(t1.s(123)|t2.s()|t3.s())() 
res.get() 

我如何追加一个其他任务,这个特殊的链条?

res.append(t2.s()) 

我的目标是确保链在我的代码中指定的相同顺序执行。 如果一个任务在我的链中失败,则不执行以下任务。

要知道我在指定队列中使用超级大任务。

回答

4

所有信息都包含在消息中。

消息可能在传输中,也许在世界的另一边,或者它们可能被中间处理器使用。出于这个原因,在发送消息后不能修改消息。

http://docs.celeryproject.org/en/latest/userguide/tasks.html#state

我的目标是确保链在我的代码中指定的相同顺序执行。如果一个任务在我的链中失败,则不执行以下任务。

您可以肯定的是,订单作为消息的一部分发送,如果任何任务失败,它将不会继续 。

现在,如果您真的想在运行时添加任务,那么您可以将 信息存储在数据库中,并让任务自己检查并调用新任务。 这样做虽然时有一些挑战:

1)链中的第一项任务将调用下一个任务如果成功, 那么接下来的任务将调用之后等下一个任务。

2)如果你添加一个任务到这个进程,如果第一个任务已经执行会发生什么? 还是第二个,还是第三个?

因此,您可能会猜测这需要一些繁重的同步工作。

我想一个更简单的解决办法是创建等待一个任务一个任务来完成 然后应用回调:

from celery import subtask 
from celery.result import from_serializable 

@app.task(bind=True) 
def after_task(self, result, callback, errback=None): 
    result = from_serializable(result) 
    if not result.ready(): 
     raise self.retry(countdown=1) 
    if task.successful(): 
     subtask(callback).delay(result.get()) 
    else: 
     if errback: 
      subtask(errback)() 


def add_to_chain(result, callback, errback=None): 
    callback = callback.clone()  # do not modify caller 
    new_result = callback.freeze() # sets id for callback, returns AsyncResult 
    new_result.parent = result 
    after_task.delay(result.serializable(), callback, errback) 
    return new_result 

然后你可以使用它像这样:

from tasks import t1, t2, t3 

res = (t1.s(123) | t2.s() | t3.s())() 
res = add_to_chain(t2.s()) 

注意:

bind=True是即将发布的3.1版本中的新功能,对于旧版本 您必须删除自己参考和使用current_task.retry(得到这from celery import current_task)。

Signature.freeze也是新的3。1,做 同样在旧版本中,你可以使用:

from celery import uuid 

def freeze(sig, _id=None): 
    opts = sig.options 
    try: 
     tid = opts['task_id'] 
    except KeyError: 
     tid = opts['task_id'] = _id or uuid() 
    return sig.AsyncResult(tid) 
+0

谢谢@asksol,我会读你的答案,并试图正确理解 –