阅读的文档的文档AsyncResult
有一个collect
方法,因为他们进来收集结果
http://docs.celeryproject.org/en/latest/reference/celery.result.html#celery.result.AsyncResult.collect
from celery import group
from proj.celery import app
@app.task(trail=True)
def A(how_many):
return group(B.s(i) for i in range(how_many))()
@app.task(trail=True)
def B(i):
return pow2.delay(i)
@app.task(trail=True)
def pow2(i):
return i ** 2
实施例输出:
>>> from celery.result import ResultBase
>>> from proj.tasks import A
>>> result = A.delay(10)
>>> [v for v in result.collect()
... if not isinstance(v, (ResultBase, tuple))]
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
注意: 必须启用Task.trail
选项,以便将子项列表存储在result.children
中。这是默认值,但是为了说明而明确启用。
嗨,你可能已经很长时间了,因为你遇到了这个问题,但我想知道你如何使用它来跟踪没有阻塞的组任务的进度..?据我所知,我需要分配'result = task_group.apply_async()',但仅仅分配本身就会阻塞。另一方面,如果我们不分配,我们没有ResultSet方法,它们是'completed_count'等等...... – zerohedge 2017-12-29 09:55:18