考虑以下DAG示例,其中第一个任务get_id_creds
从数据库中提取凭证列表。此操作告诉我数据库中的哪些用户能够运行进一步的数据预处理,并将这些ID写入文件/tmp/ids.txt
。然后,我将这些ID扫描到我的DAG中,并使用它们生成可并行运行的upload_transaction
任务列表。如何动态迭代上游任务的输出以在气流中创建并行任务?
我的问题是:有没有更习惯性地使用气流做到这一点的正确动态方法?我在这里感到笨拙和脆弱。我如何直接将一个有效的ID列表从一个进程传递到定义后续下游进程?
from datetime import datetime, timedelta
import os
import sys
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
import ds_dependencies
SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
sys.path.insert(0, SCRIPT_PATH)
import dash_workers
else:
print('Define DASH_PREPROC_PATH value in environmental variables')
sys.exit(1)
default_args = {
'start_date': datetime.now(),
'schedule_interval': None
}
DAG = DAG(
dag_id='dash_preproc',
default_args=default_args
)
get_id_creds = PythonOperator(
task_id='get_id_creds',
python_callable=dash_workers.get_id_creds,
provide_context=True,
dag=DAG)
with open('/tmp/ids.txt', 'r') as infile:
ids = infile.read().splitlines()
for uid in uids:
upload_transactions = PythonOperator(
task_id=uid,
python_callable=dash_workers.upload_transactions,
op_args=[uid],
dag=DAG)
upload_transactions.set_downstream(get_id_creds)
检查https://stackoverflow.com/questions/41517798/proper-way-to-create-dynamic-workflows-in-airflow –
@JuanRiaza就是这样,非常感谢。我最终使用的解决方案简化了代码,足以将我的解决方案作为一个单独的答案发布 - 我在当天结束时没有任何对'Xcom'的需求 – Aaron