我想安排一个Python作业,每15分钟启动一次。我已经使用过气流,并没有遇到过任何问题。 我用今天早些时候的开始日期创建了一个dag,频率为15分钟,两个任务包括激活一个虚拟Python环境,然后启动一个python脚本。Airflow不会启动我的dags(激活python环境并启动python脚本)
但是,我的dags不会执行自己,所以我启动了一个web服务器来检查它的状态并且什么也没有发生。因此,我尝试使用trigger_dag命令在外部启动它,但其状态保持运行状态。我真的不明白问题是什么,任何帮助将不胜感激。我附上显示问题的Airflow网络服务器的两个屏幕截图。
编辑:添加dags.py文件,这里是我的DAG的定义:
import os
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'test',
'depends_on_past': False,
'start_date': datetime(2017,10,13,0,0,0,0),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=15),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
#'end_date': datetime(2017, 9, 23),
}
dag = DAG('dbscan_integ', default_args=default_args)
t_dbscan = BashOperator(
task_id='job_batch_dbscan',
bash_command='/home/test/Documents/git_repo/analyser/algo_integ/integration_dbscan/python main_algo.py',
dag=dag)
t_virtual_dbscan = BashOperator(
task_id='virtual_dbscan',
bash_command='source activate integdb',
dag=dag)
t_dbscan.set_upstream(t_virtual_dbscan)
如果您也从DAG分享相关的代码片段,这将有所帮助。如果您错过了在定义它们时将任务分配给DAG,就会发生这种情况。 – Him
@Him我添加了我的dags的代码,但是我认为我的任务被正确地分配给任务,这要归功于参数dag = dag –