2017-10-18 116 views
0

我想安排一个Python作业,每15分钟启动一次。我已经使用过气流,并没有遇到过任何问题。 我用今天早些时候的开始日期创建了一个dag,频率为15分钟,两个任务包括激活一个虚拟Python环境,然后启动一个python脚本。Airflow不会启动我的dags(激活python环境并启动python脚本)

但是,我的dags不会执行自己,所以我启动了一个web服务器来检查它的状态并且什么也没有发生。因此,我尝试使用trigger_dag命令在外部启动它,但其状态保持运行状态。我真的不明白问题是什么,任何帮助将不胜感激。我附上显示问题的Airflow网络服务器的两个屏幕截图。

enter image description here

enter image description here

编辑:添加dags.py文件,这里是我的DAG的定义:

import os 
from airflow import DAG 
from datetime import datetime, timedelta 
from airflow.operators.bash_operator import BashOperator 


default_args = { 
    'owner': 'test', 
    'depends_on_past': False, 
    'start_date': datetime(2017,10,13,0,0,0,0), 
    'email': ['[email protected]'], 
    'email_on_failure': False, 
    'email_on_retry': False, 
    'retries': 1, 
    'retry_delay': timedelta(minutes=15), 
    # 'queue': 'bash_queue', 
    # 'pool': 'backfill', 
    # 'priority_weight': 10, 
    #'end_date': datetime(2017, 9, 23), 
} 

dag = DAG('dbscan_integ', default_args=default_args) 


t_dbscan = BashOperator(
    task_id='job_batch_dbscan', 
     bash_command='/home/test/Documents/git_repo/analyser/algo_integ/integration_dbscan/python main_algo.py', 
dag=dag) 


t_virtual_dbscan = BashOperator(
    task_id='virtual_dbscan', 
    bash_command='source activate integdb', 
    dag=dag) 
t_dbscan.set_upstream(t_virtual_dbscan) 
+0

如果您也从DAG分享相关的代码片段,这将有所帮助。如果您错过了在定义它们时将任务分配给DAG,就会发生这种情况。 – Him

+0

@Him我添加了我的dags的代码,但是我认为我的任务被正确地分配给任务,这要归功于参数dag = dag –

回答

1

我知道这是愚蠢的,但你有调度和工人正常运转?


编辑1:

好吧,我想原因是,你不必在你的DAG schedule_interval,而不是你给timedelta(分钟= 15)retry_delay。

+0

这不是我注意到并修复它的schedule_interval。我不明白调度器必须单独运行,我会在星期一进行测试。非常感谢你 –