0
我有一个Airflow操作员在第三方服务上踢任务,然后监视该作业的进度。在代码中,如果气流工人重新启动(通常是由于代码部署)的执行貌似在重新启动的气流任务实例中恢复状态
def execute(self, context):
external_id = start_external_job()
wait_until_external_job_completes(external_id)
这个任务的一个实例运行时,我想该任务的重新启动的实例是能够拿起前一个离开的地方(监控第三方服务的工作)。有没有办法在同一任务实例的后续运行中共享第三方作业ID?
的增强执行方法的一个例子是这样的:
def execute(self, context):
external_id = load_external_id_for_task_instance()
if external_id is None:
external_id = start_external_job(args)
persist_external_id_for_task_instance(external_id)
wait_until_external_job_completes(external_id)
我需要实现load_external_id_for_task_instance
和persist_external_id_for_task_instance
。
谢谢!我刚刚实施,这是一种魅力! –