2015-12-14 57 views
3

我正在尝试实现一个相当简单的Celery工作流,​​其中我接收到与元组(或列表)相同任务的多个并行调用的结果。收集来自并行Celery任务执行的结果

@app.task 
def add(x, y): 
    return x + y 

@app.task 
def master(): 
    return group(add.s(1, 2), add.s(3, 4))() 

由此,我想在一个通用的方法检索(3, 7),即,在不依赖于工作流本身的一种方式。我正在寻找某种“将异步结果图减少到原语”操作。我已经尝试用下面的(我已经取代结果的ID与#num为简洁起见)

r = master.delay() 
r.get()  # <GroupResult: #1 [#2, #3]> 
r.collect() # [(<AsyncResult: #0>, <GroupResult #1 [#2, #3]>), 
      # (<GroupResult: #1 [#2, #3]>, [3, 7]) 
      # (<AsyncResult: #2>, 3), 
      # (<GroupResult: #3>, 7)] 

r.get()返回围绕两个AsyncResult标识的包装,所以我将不得不递归处理每一个。 r.collect()已接近,但它递归太深。

我可以做类似

r.children[0].get() 

但这不是通用的,因为它明确地依赖于结果图的结构。另外,我可以遍历r.collect(),直到我找到一个元组,其价值不是ResultBase一个实例,像

next(value for _, value in r.collect() if not isinstance(value, ResultBase)) 

,但我不知道这是在所有情况下实际上是正确的,我希望有一个更优雅的方式来做到这一点。

如果有一种重构master任务的方式来让检索结果更容易,我会对它开放,只要子任务并行启动即可。任何建议,将不胜感激。先谢谢你。


编辑一个相关的问题是,如果我想打电话r.get()r.collect()之前检索在非阻塞方式任务结果(例如,通过手动查询r.status,我不能简单地做到这一点

r = master.delay() 

# some time later... 
if r.status in READY_STATES: 
    r.get() 

因为rAsyncResult其解析为GroupResult,即它的GroupResult或其孩子之前完成。有没有办法来调用组的方式那“跳过”顶级AsyncResult?这将解决这两个问题,因为r.statusr.get()将分别反映子任务的状态和值。

回答

1

当然,正确的解决方案是最简单的解决方案:调用master作为在当前进程中执行它的函数。

r = master() 
r.get()  # [3, 7] 
r.collect() # [(<GroupResult: #1 [#2, #3]>, [3, 7]), 
      # (<AsyncResult: #2>, 3), 
      # (<AsyncResult: #3>, 7)] 

而是推迟group启动代码的工作进程,它在目前的进程启动。由于group完全是异步的,因此行为不会改变,性能会提高。

+0

我正面临着类似的挑战,您目前遇到了很多麻烦。你能分享一大部分代码吗?你在哪里存储'r.collect()',是否阻塞?你有'for'循环吗?一个'while'循环? – zerohedge

+0

这样做是为了在小组结束后还是在一组工作正在进行时收集结果? – zerohedge