2017-04-19 25 views
0

我正在使用包含多个子数据标的主dag(main_dag),并且每个子数据都有一些任务。我从subdagA taskA中推送了一个xcom,但我在subdagB taskB中拉取了该xcom。由于xcom_pull()中的dag_id参数默认为self.dag_id,因此我一直无法提取必要的xcom。我想知道如何做到这一点和/或是否有更好的方法来设定这种情况,所以我不必处理这个问题。什么我目前在做subdagB从子dag中拉xcom

例如:

def subdagB(parent_dag, child_dag, start_date, schedule_interval): 
    subdagB = DAG('%s.%s' % (parent_dag, child_dag), start_date=start_date, schedule_interval=schedule_interval) 
    start = DummyOperator(
     task_id='taskA', 
     dag=subdagB) 
    tag_db_template = '''echo {{ task_instance.xcom_pull(dag_id='dag.main_dag.subdagA', task_ids='taskA') }};''' 
    t1 = BashOperator(
     task_id='taskB', 
     bash_command=tag_db_template, 
     xcom_push=True, 
     dag=subdagB) 
    end = DummyOperator(
     task_id='taskC', 
     dag=subdagB) 
    t0.set_upstream(start) 
    t1.set_upstream(t0) 
    end.set_upstream(t1) 
    return subdagB 

预先感谢您的帮助!

回答

0

只要你重写[Operator] .xcom_pull(dag_id = dag_id,...)或[TaskInstance] .xcom_pull(dag_id = dag_id,...)中的dag_id,你应该没问题。

where dag_id = "{parent_dag_id}.{child_dag_id}"

如果你可以让你的例子更完整,我可以尝试本地运行它,但我测试了(类似)的例子,跨subdag-xcoms正常工作。

+0

请注意,如果您使用TriggerDagRunOperator,则默认情况下,DagRun将具有不同的执行日期,并且您将无法跨* DAG取消xcoms *。您可以自定义dagrun_operator.py以保留调用DAG的execution_date以解决此问题。 – jastang