2017-07-16 48 views
1

我有一个芹菜任务返回一个列表。在此之后,我希望将列表中的每个元素发送到2任务链。据我所见,这就像是chord的反面。即而不是将一个任务作为group的回调,我想要一个group任务作为单个任务的回调。芹菜组作为回调

喜欢的东西: group(chain(validate.s(i) | run.s(i))() for i in results_from_first_task)

是有办法的第一个任务完成后自动执行这一群体?

作为一个简单的例子,假设一个简单的任务返回的文件列表:

@app.task() 
def list_files(pattern): 
    return glob.glob(pattern) 

而另一对夫妇,其在单个文件执行操作任务:

@app.task() 
def validate(path): 
    return my_validation_function(path) 

@app.task() 
def run(path): 
    return my_run_function(path) 

我想从list_files的结果中为每个条目执行validaterun

回答

0

您可以使用芹菜信号排队您的任务。

from celery.signals import task_success 


@task_success.connect() 
def task_success_handler(sender=None, headers=None, body=None, **kwargs): 
    result = kwargs['result'] 
    for file in result: 
     validate.apply_async(file) 
     run.apply_async(file) 

或者,你可以创建一个中间任务,并用它来排队等任务

@app.task() 
def process(result): 
    for file in result: 
     validate.apply_async(file) 
     run.apply_async(file) 

现在你可以在你的小组使用此任务。