2017-07-25 27 views
1

考虑以下DAG示例,其中第一个任务get_id_creds从数据库中提取凭证列表。此操作告诉我数据库中的哪些用户能够运行进一步的数据预处理,并将这些ID写入文件/tmp/ids.txt。然后,我将这些ID扫描到我的DAG中,并使用它们生成可并行运行的upload_transaction任务列表。如何动态迭代上游任务的输出以在气流中创建并行任务?

我的问题是:有没有更习惯性地使用气流做到这一点的正确动态方法?我在这里感到笨拙和脆弱。我如何直接将一个有效的ID列表从一个进程传递到定义后续下游进程?

from datetime import datetime, timedelta 
import os 
import sys 

from airflow.models import DAG 
from airflow.operators.python_operator import PythonOperator 

import ds_dependencies 

SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH') 
if SCRIPT_PATH: 
    sys.path.insert(0, SCRIPT_PATH) 
    import dash_workers 
else: 
    print('Define DASH_PREPROC_PATH value in environmental variables') 
    sys.exit(1) 

default_args = { 
    'start_date': datetime.now(), 
    'schedule_interval': None 
} 

DAG = DAG(
    dag_id='dash_preproc', 
    default_args=default_args 
) 

get_id_creds = PythonOperator(
    task_id='get_id_creds', 
    python_callable=dash_workers.get_id_creds, 
    provide_context=True, 
    dag=DAG) 

with open('/tmp/ids.txt', 'r') as infile: 
    ids = infile.read().splitlines() 

for uid in uids: 
    upload_transactions = PythonOperator(
     task_id=uid, 
     python_callable=dash_workers.upload_transactions, 
     op_args=[uid], 
     dag=DAG) 
    upload_transactions.set_downstream(get_id_creds) 
+1

检查https://stackoverflow.com/questions/41517798/proper-way-to-create-dynamic-workflows-in-airflow –

+0

@JuanRiaza就是这样,非常感谢。我最终使用的解决方案简化了代码,足以将我的解决方案作为一个单独的答案发布 - 我在当天结束时没有任何对'Xcom'的需求 – Aaron

回答

1

考虑到Apache Airflow是一个工作流管理工具,即。它决定了用户在比较中定义的任务(作为例子)与作为数据流管理工具的Apache Nifi之间的依赖关系,这里的依赖关系是通过任务传输的数据。

也就是说,我认为你的做法是正确的退出(我的意见的基础上发布的代码)气流提供了一个称为XCom概念。它允许任务通过传递一些数据在它们之间“交叉通信”。通过的数据应该多大?这取决于你测试!但通常情况下它不应该那么大。我认为它是以键值对的形式存储在气流元数据库中的,也就是说,例如你不能传递文件,但是带有ID的列表可以工作。

就像我说你应该测试你的自我。我会很高兴知道你的经验。 Here是一个示例dag,它演示了使用XComhere是必要的文档。干杯!

+0

感谢您提供的反馈@sdikby。我昨天花了一些时间研究'Xcom',觉得我对于如何在任务之间交换数据有着强烈的概念性把握。在这种情况下,我没有看到这适用于我正在寻找一种方法,在同一个DAG中的上游任务的输出上创建任意数量的任务_based_。我想到我可以创建两个DAG,一个用于定义任务参数,另一个用于执行,但这并不理想,因为我失去了依赖关系。你是否同意'Xcom'不支持这个应用程序,或者我错过了somehing? – Aaron

0

Per @Juan Riza的建议我查看了这个链接:Proper way to create dynamic workflows in Airflow。这是相当多的答案,但我能简化的解决方案,以至于我以为我会提供在这里实现我自己修改的版本:

from datetime import datetime 
import os 
import sys 

from airflow.models import DAG 
from airflow.operators.python_operator import PythonOperator 

import ds_dependencies 

SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH') 
if SCRIPT_PATH: 
    sys.path.insert(0, SCRIPT_PATH) 
    import dash_workers 
else: 
    print('Define DASH_PREPROC_PATH value in environmental variables') 
    sys.exit(1) 

ENV = os.environ 

default_args = { 
    # 'start_date': datetime.now(), 
    'start_date': datetime(2017, 7, 18) 
} 

DAG = DAG(
    dag_id='dash_preproc', 
    default_args=default_args 
) 

clear_tables = PythonOperator(
    task_id='clear_tables', 
    python_callable=dash_workers.clear_db, 
    dag=DAG) 

def id_worker(uid): 
    return PythonOperator(
     task_id=uid, 
     python_callable=dash_workers.main_preprocess, 
     op_args=[uid], 
     dag=DAG) 

for uid in capone_dash_workers.get_id_creds(): 
    clear_tables >> id_worker(uid) 

clear_tables清理,将被重新构建成数据库过程的结果。 id_worker是一个基于从get_if_creds返回的ID值数组动态生成新预处理任务的函数。任务ID只是相应的用户ID,尽管它可能很容易成为索引i,如上例所述。

注意这位位移运算符(<<)向后在我看来,作为clear_tables任务应该先,但它是什么,似乎在这种情况下进行工作。

相关问题