我正在尝试实现一个相当简单的Celery工作流,其中我接收到与元组(或列表)相同任务的多个并行调用的结果。收集来自并行Celery任务执行的结果
@app.task
def add(x, y):
return x + y
@app.task
def master():
return group(add.s(1, 2), add.s(3, 4))()
由此,我想在一个通用的方法检索(3, 7)
,即,在不依赖于工作流本身的一种方式。我正在寻找某种“将异步结果图减少到原语”操作。我已经尝试用下面的(我已经取代结果的ID与#num
为简洁起见)
r = master.delay()
r.get() # <GroupResult: #1 [#2, #3]>
r.collect() # [(<AsyncResult: #0>, <GroupResult #1 [#2, #3]>),
# (<GroupResult: #1 [#2, #3]>, [3, 7])
# (<AsyncResult: #2>, 3),
# (<GroupResult: #3>, 7)]
r.get()
返回围绕两个AsyncResult
标识的包装,所以我将不得不递归处理每一个。 r.collect()
已接近,但它递归太深。
我可以做类似
r.children[0].get()
但这不是通用的,因为它明确地依赖于结果图的结构。另外,我可以遍历r.collect()
,直到我找到一个元组,其价值不是ResultBase
一个实例,像
next(value for _, value in r.collect() if not isinstance(value, ResultBase))
,但我不知道这是在所有情况下实际上是正确的,我希望有一个更优雅的方式来做到这一点。
如果有一种重构master
任务的方式来让检索结果更容易,我会对它开放,只要子任务并行启动即可。任何建议,将不胜感激。先谢谢你。
编辑一个相关的问题是,如果我想打电话r.get()
或r.collect()
之前检索在非阻塞方式任务结果(例如,通过手动查询r.status
,我不能简单地做到这一点
r = master.delay()
# some time later...
if r.status in READY_STATES:
r.get()
因为r
是AsyncResult
其解析为GroupResult
,即它的GroupResult
或其孩子之前完成。有没有办法来调用组的方式那“跳过”顶级AsyncResult
?这将解决这两个问题,因为r.status
和r.get()
将分别反映子任务的状态和值。
我正面临着类似的挑战,您目前遇到了很多麻烦。你能分享一大部分代码吗?你在哪里存储'r.collect()',是否阻塞?你有'for'循环吗?一个'while'循环? – zerohedge
这样做是为了在小组结束后还是在一组工作正在进行时收集结果? – zerohedge