2011-07-05 70 views
6

我想要Celery任务取决于2个或更多其他任务的结果。我已经看过Python+Celery: Chaining jobs?http://pypi.python.org/pypi/celery-tasktree,但只有任务只有一项依赖任务时,这些才是好的。依赖图执行Celery任务

我知道TaskSet,但似乎没有办法在TaskSetResult.ready()变为True时立即执行回调。我现在想的是有一个周期性任务,每隔几毫秒左右轮询TaskSetResult.ready(),并在回调函数触发它,因为它返回True,但对我来说听起来相当不雅。

有什么建议吗?

回答

2

mrbox是真的,您可以重试,直到结果准备好,但在文档中不太清楚,当您重试时必须通过setid和子任务元素,并且为了恢复它必须使用映射函数,下面有一个示例代码来解释我的意思。

def run(self, setid=None, subtasks=None, **kwargs): 

    if not setid or not subtasks: 
     #Is the first time that I launch this task, I'm going to launch the subtasks 
     … 
     tasks = [] 
     for slice in slices: 
      tasks.append(uploadTrackSlice.subtask((slice,folder_name))) 

     job = TaskSet(tasks=tasks) 
     task_set_result = job.apply_async() 
     setid = task_set_result.taskset_id 
     subtasks = [result.task_id for result in task_set_result.subtasks] 
     self.retry(exc=Exception("Result not ready"), args=[setid,subtasks]) 

    #Is a retry than we just have to check the results   
    tasks_result = TaskSetResult(setid, map(AsyncResult,subtasks)) 
    if not tasks_result.ready(): 
     self.retry(exc=Exception("Result not ready"), args=[setid,subtasks]) 
    else:  
     if tasks_result.successful(): 
      return tasks_result.join() 
     else: 
      raise Exception("Some of the tasks was failing") 
2

恕我直言,你可以做某事类同于docs- link

或者你可以使用重试方法做与MAX_RETRIES =无的东西 - 如果“基地”的任务。就绪一个()是假的,你可以fire .retry()方法,直到完成“基本”任务。

7

在最新版本的芹菜(3.0 +),可以使用所谓的和弦达到预期的效果:

http://docs.celeryproject.org/en/latest/userguide/canvas.html#the-primitives

简单的和弦

和弦原语使我们能够添加回调,当一个组中的任务的所有 都完成执行时,这通常是算法th所需的 在不尴尬的并行:

>>> from celery import chord 
>>> res = chord((add.s(i, i) for i in xrange(10)), xsum.s())() 
>>> res.get() 
90 

声明:我没有尝试这样做我自己呢。

相关问题