2017-04-19 48 views
10

从气流文档:气流:模式运行的气流subdag一次

SubDAGs must have a schedule and be enabled. If the SubDAG’s schedule is set to None or @once, the SubDAG will succeed without having done anything 

我明白subdagoperator作为一个BackfillJob实际实现的,因此,我们必须提供一个schedule_interval给操作者。然而,有没有办法获得一个subdag的schedule_interval="@once"的语义等价物?我担心,如果我使用设置schedule_interval="@daily"作为subdag,如果subdag运行时间超过一天,则subdag可能会运行多次。

def subdag_factory(parent_dag_name, child_dag_name, args): 
    subdag = DAG(
     dag_id="{parent_dag_name}.{child_dag_name}".format(
      parent_dag_name=parent_dag_name, child_dag_name=child_dag_name 
     ), 
     schedule_interval="@daily", # <--- this bit here 
     default_args=args 
    ) 

    ... do more stuff to the subdag here 
    return subdag 

TLDR:如何捏造出来

回答

2

“只有一次每父DAG的触发运行此subdag”尝试用时间表=无为subdag外部触发模式。在这种情况下,它将只在父母触发时才运行dag

+0

为了说明起见,你建议使用[TriggerDagRunOperator](https://airflow.incubator.apache.org/code.html?highlight=trigger%20dagrun#airflow.operators.TriggerDagRunOperator)来触发一个dag而不用时间表?这个subdag的关键是我们想要阻塞语义,触发器dagrun运算符只是触发一个dagrun,然后继续前进,不会等到dagrun完成。另外,你不会在气流UI中获得一个subdag运行的透明度,你只知道一些随机的dagrun被触发。 – gnicholas

4

我发现 [email protected]对我的子标签工作得很好。也许我的版本已经过时了,但即使所有任务都成功(或跳过)了,我的subdags 也没有出现问题。

运行相当快乐地生活在我的机器上,现在实际的例子代码:

subdag_name = ".".join((parent_name,child_name)) 
logging.info(parent_name) 
logging.info(subdag_name) 
dag_subdag = DAG(
    dag_id=subdag_name, 
    default_args=dargs, 
    schedule_interval="@once", 
) 

其实,我最初建几乎所有我的DAG为荣耀CFG文件我subdags。不知道经过一些试验和错误之后这个想法有多好,但是时间间隔从来都不是我的阻挡者。

我正在运行一个相对较新的1.8版本,只有很少的定制。我一直在遵循示例DAG建议将我的子标签保存在dag文件夹内的文件夹中,以便它们不会显示在DagBag中。

+0

我使用airflow 1.7.1.3和1.8不是ATM选项,因为该版本意外地破坏了自定义执行器插件。我将看看1.8看看是否可以使用'“@once”'时间表来运行subdags,但如果这是真的,那么我会感到惊讶,因为文档说它不是。 – gnicholas

+0

运气好吗?我的代码仍然快乐地逃跑。 我试图在1.7中查找规范的方法来为你做这件事。我能够找到的最接近的东西(假设'@ once'不可行)设置你的'execution_timeout'为实际的subdag任务比你在subdag本身设置的执行频率短。这样,在你的subdag可以启动更多任务之前,你会超时。我知道这是猜测,但我不能轻易地在我们的叉子中找到与您所在的叉子一样古老的气流。 – apathyman

+1

很想听到作者为什么在文档明确表示不应该的时候这么做。 – qwwqwwq