我有一个相对简单的任务,首先在1.2 mio文件上运行,并为每个文件(每个都保存中间产品的多个步骤)都有一个管道。我已经在luigi中实现了这个功能:https://gist.github.com/wkerzendorf/395c85a2955002412be302d708329f7f。我喜欢Luigi使用文件系统来查看任务是否完成。 我还发现了一个可以删除中间产品的实现,管道将重新创建所有相关产品(这样我就可以更改管道)。 我该如何在气流中做到这一点(或者我应该坚持Luigi?)?从luigi切换到气流
-1
A
回答
1
我真的不知道路易吉是如何工作的。我主要使用Apache Airflow。 Airflow是一个工作流程管理系统。这意味着它不会传输数据,转换数据或生成一些数据(虽然它会生成日志,并且有一个名为Xcom
的概念,允许在任务之间交换消息,从而允许更多细微的控制形式和共享状态。 Apache Nifi。但是它定义了使用Operators
实例化每个任务的依赖关系,例如。 BashOperator
。为了知道任务是否完成,它会检查同一任务返回的信号。
以下是您想要在Airflow中实施的示例。
要在气流使用from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
import glob
import gzip
import shutil
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
dag_id='example_dag', default_args=args,
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60))
def extract_gzs():
for filename in glob.glob('/1002/*.gz')
with gzip.open(filename, 'rb') as f_in, open(filename[:-3], 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
extractGZ = PythonOperator(
task_id='extract_gz',
provide_context=True,
python_callable=extract_gzs(),
dag=dag)
cmd_cmd="""
your sed script!
"""
sed_script = BashOperator(
task_id='sed_script',
bash_command=cmd_cmd,
dag=dag)
extractGZ.set_downstream(sed_script)
- 进口经营者(当然,如果你需要其他类/库)
- 定义你的达格。这里在变量
args
中我定义了owner
和start_date
参数。 - 然后实例化您的DAG。在这里,我把它命名为example_dag,归功于它的定义变量,schedule_interval和之后的时间应该是超时(有更多的根据自己的需要使用参数)
- 创建一个Python函数extract_gzs()
- 实例化一个
PythonOperator
哪里我打电话给我的蟒蛇FUNC - 做同样与bash的代码
- 确定两个任务之间的依赖关系intances
当然有更多的实施同样的想法的方式。根据需要来适应! PS:Here有一些Apache Airflow的例子
相关问题
- 1. FFmpeg:Remux f4v从流切换到mp4
- 2. luigi -pip install luigi
- 3. NVD3切换流
- 4. 将流体从一个气缸转移到另一个气缸
- 5. 切换视频流
- 6. 气流
- 7. 气流:dag_id找不到
- 8. 切换到新的工作流程
- 9. 自定义气球 - 切换按钮
- 10. 在工作流设计器中从VB切换到C#
- 11. 春季Websocket与SockJs从XHR流切换到Websocket
- 12. 从Hg切换到Git的Subrepo工作流问题
- 13. grafana从http切换到https
- 14. 从InstallShield切换到WiX
- 15. 从JWPlayer切换到Flowplayer
- 16. 从ActiveAndroid切换到GreenDao
- 17. 从PHP切换到Objective-C
- 18. 从3 colums切换到2
- 19. NSURLSessionDownloadTask从http切换到https
- 20. 从UIViewController切换到UITabBarController
- 21. 从bottle.template切换到mako
- 22. 从md5()切换到crypt()
- 23. 从IF/ELSE切换到Javascript
- 24. 从Android Activity切换到MapActivity
- 25. 从Nashorn切换到Rhino(Gradle)
- 26. 从maven切换到sbt
- 27. 从ListView切换到RecyclerView
- 28. 从$资源切换到Restarular
- 29. Android:从XML切换到SQLite
- 30. 从Eclipse切换到Netbeans
也许,我误解了整个管道的事情。我假设你建立了一条管道,然后你给它一个数据集,然后沿着这条管道进行变换。这意味着一个管道工作在1.2 mio文件上。这不是正确的思考方式吗?做一个在文件上运行sed的气流管道,然后将其应用到1.2 mio文件应该是微不足道的,不是吗? –
@WolfgangKerzendorf查看我的修改答案。 – sdikby
谢谢,我会试试看。这仍然不完全如我所想的那样。我想为一个文件构建一个管道,然后以某种方式通过这个东西来推动每个文件。 –