2013-02-28 51 views
3
@celery.task 
def my_task(my_object): 
    do_something_to_my_object(my_object) 


#in the code somewhere 
tasks = celery.group([my_task.s(obj) for obj in MyModel.objects.all()]) 
group_task = tasks.apply_async() 

问题:芹菜是否有东西可以检测到组任务的进度?我可以得到有多少任务在那里,有多少人已经处理?跟踪celery.group任务的进度?

回答

3

摆弄壳(ipython的选项卡自动完成)我发现group_task(这是一个celery.result.ResultSet对象)有一个名为completed_count的方法,它给出了我所需要的。

还发现在http://docs.celeryproject.org/en/latest/reference/celery.result.html#celery.result.ResultSet.completed_count

+0

嗨,你可能已经很长时间了,因为你遇到了这个问题,但我想知道你如何使用它来跟踪没有阻塞的组任务的进度..?据我所知,我需要分配'result = task_group.apply_async()',但仅仅分配本身就会阻塞。另一方面,如果我们不分配,我们没有ResultSet方法,它们是'completed_count'等等...... – zerohedge 2017-12-29 09:55:18

0

阅读的文档的文档AsyncResult有一个collect方法,因为他们进来收集结果

http://docs.celeryproject.org/en/latest/reference/celery.result.html#celery.result.AsyncResult.collect

from celery import group 
from proj.celery import app 

@app.task(trail=True) 
def A(how_many): 
    return group(B.s(i) for i in range(how_many))() 

@app.task(trail=True) 
def B(i): 
    return pow2.delay(i) 

@app.task(trail=True) 
def pow2(i): 
    return i ** 2 

实施例输出:

>>> from celery.result import ResultBase 
>>> from proj.tasks import A 

>>> result = A.delay(10) 
>>> [v for v in result.collect() 
... if not isinstance(v, (ResultBase, tuple))] 
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81] 

注意: 必须启用Task.trail选项,以便将子项列表存储在result.children中。这是默认值,但是为了说明而明确启用。