2017-06-24 33 views
1

我有一个dag检查FTP服务器上的文件(气流运行在不同的服务器上)。如果存在文件,则文件被移到S3(我们在这里存档)。从那里,文件名被传递给Spark提交作业。火花作业将通过S3(不同服务器上的Spark集群)处理文件。我不确定是否需要有多个dag,但这里是流程。我想要做的只是在S3存储桶中存在文件时才运行Spark作业。如何在气流中途成功退出任务?

我尝试使用S3传感器,但它符合超时标准后失败/超时,因此整个DAG设置为失败。

check_for_ftp_files -> move_files_to_s3 -> submit_job_to_spark -> archive_file_once_done 

我只想要做的FTP检查,只有当一个或多个文件移入S3脚本后运行一切。

回答

1

您可以有2个不同的DAG。一个只有S3传感器,并且每5分钟一直运行。如果它找到该文件,它会触发第二个DAG。第二个DAG将文件提交给S3并进行归档(如果完成)。您可以在第一个DAG中使用TriggerDagRunOperator进行触发。

+0

如果没有找到文件,会不会退出w /错误代码会发生什么?因此有人需要重新开展工作,不是吗? – luckytaxi

+0

第一个DAG(有两个任务,S3Sensor和TriggerDagRunOperator)可以计划每五分钟运行一次。这意味着传感器每5分钟运行一次,如果发现文件,则会触发第二个DAG。否则,它什么都不做,5分钟后重播。如果它以一个错误代码退出并不重要(您不应将第一个DAG的depends_on_past设置为true)。 – Him

0

他给出的答案将起作用。 另一种选择是使用传感器具有的“soft_fail”参数(它是来自BaseSensorOperator的参数)。如果您将此参数设置为True,而不是失败任务,则它将跳过它,并且分支中的所有后续任务也将被跳过。

有关更多信息,请参阅airflow code

相关问题