2017-05-19 26 views
1

每当airflow dag运行并从所有任务访问该文件时,我们是否可以创建唯一的文件名? 我试着创建全局变量(output_filename)并为其添加时间戳。 但是,当我在任务中访问file_name时,每个任务都会生成不同的文件名,因为它正在计算每个任务中的时间戳。 下面是示例代码:创建唯一的文件名并在所有气流任务中访问该文件

table_name = 'Test_ABC' 
start_date = datetime.now() 
cur_tmpstp = start_date.strftime('%Y_%m_%d') 

output_filename = table_name + "_" + cur_tmpstp + ".csv" 
S3_landing_path = "s3://abc/" 

def clean_up(): 
    if os.path.exists(output_filename): 
     os.remove(output_filename) 


task_1 = BashOperator(
    task_id='task_1', 
    bash_command="aws s3 cp %s %s/ " %(output_filename, S3_landing_path,), 
    dag=dag) 

task_2_cleanup = PythonOperator(
    task_id='task_2_cleanup', 
    python_callable=clean_up, 
    dag=dag) 

我们,我们将要访问output_filename更多的任务。 我们如何在所有任务中访问output_filename全局变量?

回答

1

如果您只需要日间粒度的时间戳,那么您可以在模板中使用默认变量。这些变量的一些例子(取自http://airflow.readthedocs.io/en/latest/code.html#default-variables)是

{{ ds }} the execution date as YYYY-MM-DD 
{{ ds_nodash }}  the execution date as YYYYMMDD 
{{ execution_date }} the execution_date, (datetime.datetime) 
相关问题