2015-08-26 68 views
1

如图所示here数据流管道由固定的DAG表示。我想知道是否有可能实现一个流水线,直到基于迄今为止计算出的数据满足动态评估条件为止。Dataflow中的迭代处理

下面是一些伪代码来说明想什么,我来实现:

PCollection pco = null 
    while(true): 
     pco = pco.apply(someTransform()) 
     if (conditionSatisfied(pco)): 
      break 
    pco.Write() 

回答

3

好像你真的想迭代计算。目前Dataflow不提供对此的支持,但我们知道这是一个非常重要的用例,我们正在寻找正确的API集来表达它。

现在你的解决方法是:

  • 迭代运行整个管道(管道运行,检查输出,如果条件不满足再次运行等)。这对管道设置和拆卸开销有明显的不利影响。
  • 通过.apply()无条件地在循环中构建一个硬编码迭代次数的管道,然后运行整个管道。
  • 两者的组合,例如运行固定的5次迭代管道,直到你对结果满意为止。