2017-03-23 42 views
2

我有以下两个SSHExecuteOperator任务的DAG。第一个任务执行返回参数的存储过程。第二项任务需要此参数作为输入。如何检索通过SSHExecuteOperator推送的Airflow XCom的值

请解释如何从推送到task1中的XCom中获取值,以便在task2中使用它?

from airflow import DAG 
from datetime import datetime, timedelta 
from airflow.contrib.hooks.ssh_hook import SSHHook 
from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator 
from airflow.models import Variable 

default_args = { 
    'owner': 'airflow', 
    'depends_on_past': False, 
    'start_date': datetime.now(), 
    'email': ['[email protected]'], 
    'email_on_failure': True, 
    'retries': 0 
} 

#server must be changed to point to the correct environment, to do so update DataQualitySSHHook variable in Airflow admin 
DataQualitySSHHook = Variable.get('DataQualitySSHHook') 
print('Connecting to: ' + DataQualitySSHHook) 
sshHookEtl = SSHHook(conn_id=DataQualitySSHHook) 
sshHookEtl.no_host_key_check = True 

#create dag 
dag = DAG(
    'ed_data_quality_test-v0.0.3', #update version whenever you change something 
    default_args=default_args, 
    schedule_interval="0 0 * * *", 
    dagrun_timeout=timedelta(hours=24), 
    max_active_runs=1) 

#create tasks 
task1 = SSHExecuteOperator(
    task_id='run_remote_sp_audit_batch_register', 
    bash_command="bash /opt/scripts/data_quality/EXEC_SP_AUDIT_BATCH.sh 'ED_DATA_QUALITY_MANUAL' 'REGISTER' '1900-01-01 00:00:00.000000' '2999-12-31 00:00:00.000000' ", #keep the space at the end 
    ssh_hook=sshHookEtl, 
    xcom_push=True, 
    retries=0, 
    dag=dag) 

task2 = SSHExecuteOperator(
    task_id='run_remote_sp_audit_module_session_start', 
    bash_command="echo {{ ti.xcom_pull(task_ids='run_remote_sp_audit_batch_register') }}", 
    ssh_hook=sshHookEtl, 
    retries=0, 
    dag=dag) 

#create dependencies 
task1.set_downstream(task2) 
+0

你DAG定义似乎好。你能否成功运行DAG?任何错误? –

回答

1

所以,我已经找到了解决办法是,当TASK1执行shell脚本,你必须确保你希望被XCOM变量被捕获的参数(使用echo)通过脚本打印的最后一件事。

然后我可以用下面的代码片段检索XCOM变量值:

{{ task_instance.xcom_pull(task_ids='run_remote_sp_audit_batch_register') }}