2017-07-15 63 views
0

我有一个DAG,它是通过查询DynamoDB获取列表创建的,并且列表中的每个项目都使用PythonOperator创建并将其添加到DAG中。在下面的例子中没有显示,但重要的是要注意列表中的一些项目取决于其他任务,所以我使用set_upstream来强制执行依赖关系。动态创建任务列表

- airflow_home 
    \- dags 
    \- workflow.py 

workflow.py

def get_task_list(): 
    # ... query dynamodb ... 

def run_task(task): 
    # ... do stuff ... 

dag = DAG(dag_id='my_dag', ...) 
tasks = get_task_list() 
for task in tasks: 
    t = PythonOperator(
     task_id=task['id'], 
     provide_context=False, 
     dag=dag, 
     python_callable=run_task, 
     op_args=[task] 
    ) 

问题是workflow.py是越来越一遍又一遍地跑,我的get_task_list()方法得到通过AWS和抛出异常节流(每一个任务运行?时间)。

我认为这是因为每当run_task()被称为它运行所有的全局数据workflow.py所以我试着移动run_task()到一个单独的模块,像这样:

- airflow_home 
    \- dags 
    \- workflow.py 
    \- mypackage 
     \- __init__ 
     \- task.py 

但它并没有改变任何东西。我甚至尝试将get_task_list()放入一个使用工厂函数包装的SubDagOperator中,该工具的行为方式仍然相同。

我的问题与这些问题有关吗?

而且,为什么workflow.py得到运行,因此经常和为什么会任务的时候方法不引用workflow.py通过get_task_list()抛出事业的各个任务的错误和失败对它没有依赖性?

最重要的是,什么是最好的方式来并行处理列表并强制列表中的项目之间的任何依赖关系?

回答

1

根据您引用的问题,在dag运行时,气流不支持创建任务。

因此,气流将在开始运行之前定期生成完整的DAG定义。理想情况下,此类生成的时间应该与该DAG的时间间隔相同。

但是这可能是每次气流检查dag的变化时,它也会生成完整的dag,从而导致请求过多。该时间使用airflow.cfg中的配置min_file_process_interval和dag_dir_list_interval进行控制。

关于任务失败,他们失败,因为创建DAG本身失败,气流无法启动它们。

+0

将'min_file_process_interval'设置为30,将对'get_task_list()'的调用减慢到30秒,并且我停止了被限制。至于动态任务创建,我将尝试创建一个dag,它将构建另一个dag并将其保存到'globals()[dag_id]',如[FAQ]中所述(http://airflow.readthedocs.io/ EN /最新/ faq.html常见?亮点=动态#如何,可以-I-创建-DAG的-动态) –