2017-07-09 180 views
-1

我有一个相对简单的任务,首先在1.2 mio文件上运行,并为每个文件(每个都保存中间产品的多个步骤)都有一个管道。我已经在luigi中实现了这个功能:https://gist.github.com/wkerzendorf/395c85a2955002412be302d708329f7f。我喜欢Luigi使用文件系统来查看任务是否完成。 我还发现了一个可以删除中间产品的实现,管道将重新创建所有相关产品(这样我就可以更改管道)。 我该如何在气流中做到这一点(或者我应该坚持Luigi?)?从luigi切换到气流

回答

1

我真的不知道路易吉是如何工作的。我主要使用Apache Airflow。 Airflow是一个工作流程管理系统。这意味着它不会传输数据,转换数据或生成一些数据(虽然它会生成日志,并且有一个名为Xcom的概念,允许在任务之间交换消息,从而允许更多细微的控制形式和共享状态。 Apache Nifi。但是它定义了使用Operators实例化每个任务的依赖关系,例如。 BashOperator。为了知道任务是否完成,它会检查同一任务返回的信号。

以下是您想要在Airflow中实施的示例。

要在气流使用
from airflow.operators.bash_operator import BashOperator 
from airflow.operators.python_operator import PythonOperator 
import glob 
import gzip 
import shutil 

args = { 
    'owner': 'airflow', 
    'start_date': airflow.utils.dates.days_ago(2) 
} 

dag = DAG(
    dag_id='example_dag', default_args=args, 
    schedule_interval='0 0 * * *', 
    dagrun_timeout=timedelta(minutes=60)) 


def extract_gzs(): 
    for filename in glob.glob('/1002/*.gz') 
     with gzip.open(filename, 'rb') as f_in, open(filename[:-3], 'wb') as f_out: 
      shutil.copyfileobj(f_in, f_out) 


extractGZ = PythonOperator(
    task_id='extract_gz', 
    provide_context=True, 
    python_callable=extract_gzs(), 
dag=dag) 


cmd_cmd=""" 
your sed script! 
""" 

sed_script = BashOperator(
    task_id='sed_script', 
    bash_command=cmd_cmd, 
    dag=dag) 


extractGZ.set_downstream(sed_script) 
  1. 进口经营者(当然,如果你需要其他类/库)
  2. 定义你的达格。这里在变量args中我定义了ownerstart_date参数。
  3. 然后实例化您的DAG。在这里,我把它命名为example_dag,归功于它的定义变量,schedule_interval和之后的时间应该是超时(有更多的根据自己的需要使用参数)
  4. 创建一个Python函数extract_gzs()
  5. 实例化一个PythonOperator哪里我打电话给我的蟒蛇FUNC
  6. 做同样与bash的代码
  7. 确定两个任务之间的依赖关系intances

当然有更多的实施同样的想法的方式。根据需要来适应! PS:Here有一些Apache Airflow的例子

+0

也许,我误解了整个管道的事情。我假设你建立了一条管道,然后你给它一个数据集,然后沿着这条管道进行变换。这意味着一个管道工作在1.2 mio文件上。这不是正确的思考方式吗?做一个在文件上运行sed的气流管道,然后将其应用到1.2 mio文件应该是微不足道的,不是吗? –

+0

@WolfgangKerzendorf查看我的修改答案。 – sdikby

+0

谢谢,我会试试看。这仍然不完全如我所想的那样。我想为一个文件构建一个管道,然后以某种方式通过这个东西来推动每个文件。 –