2017-10-04 31 views
0

我有一个Airflow操作员在第三方服务上踢任务,然后监视该作业的进度。在代码中,如果气流工人重新启动(通常是由于代码部署)的执行貌似在重新启动的气流任务实例中恢复状态

def execute(self, context): 
    external_id = start_external_job() 
    wait_until_external_job_completes(external_id) 

这个任务的一个实例运行时,我想该任务的重新启动的实例是能够拿起前一个离开的地方(监控第三方服务的工作)。有没有办法在同一任务实例的后续运行中共享第三方作业ID?

的增强执行方法的一个例子是这样的:

def execute(self, context): 
    external_id = load_external_id_for_task_instance() 
    if external_id is None: 
     external_id = start_external_job(args) 
     persist_external_id_for_task_instance(external_id) 

    wait_until_external_job_completes(external_id) 

我需要实现load_external_id_for_task_instancepersist_external_id_for_task_instance

回答

0

我建议使用XComsSensors将其分成两个任务。

你可以有一个操作员将作业提交并保存ID到XCOM:

class SubmitJobOperator(BaseOperator): 

    def execute(self, context): 
     external_id = start_external_job() 
     return external_id # return value will be stored in XCom 

然后,其获取来自XCOM和轮询ID,直到完成传感器:

class JobCompleteSensor(BaseSensor): 

    @apply_defaults 
    def __init__(self, submit_task_id, *args, **kwargs): 
     self.submit_task_id = submit_task_id # so we know where to fetch XCom value from 
     super(JobCompleteSensor, self).__init__(*args, **kwargs) 

    def poke(self, context): 
     external_id = context['task_instance'].xcom_pull(task_ids=self.submit_task_id) 
     return check_if_external_job_is_complete(external_id): 

所以你的DAG看起来是这样的:

submit_job = SubmitJobOperator(
    dag=dag, 
    task_id='submit_job', 
) 

wait_for_job_to_complete = JobCompleteSensor(
    dag=dag, 
    task_id='wait_for_job_to_complete', 
    submit_task_id=submit_job.task_id, 
) 

submit_job >> wait_for_job_to_complete 

XComs被持久化在数据库中,所以s ensor将始终能够找到以前提交的external_id

+0

谢谢!我刚刚实施,这是一种魅力! –

相关问题