airflow

    1热度

    1回答

    函数我想执行一个函数,我从任务传递一个参数。 这里是我的功能与状态参数: def sns_notify(state): client = boto3.client('sns') if state == "failed": message = config.get('sns', 'message') + state else: message =

    1热度

    1回答

    我试图通过传递一个不起作用的Bash行(thisshouldnotrun)来故意排除故障并排除错误。气流正在输出以下内容: [2017-06-15 17:44:17,869] {bash_operator.py:94} INFO - /tmp/airflowtmpLFTMX7/run_bashm2MEsS: line 7: thisshouldnotrun: command not found

    0热度

    1回答

    我们有许多DAG计划每天使用气流运行。依赖关系已使用ExternalTask​​Sensor,TriggerDagRunOperator和运营商定制启用 样品: 任务1在DAG中的依赖于任务2在DAG乙 任务3中DAG甲取决于任务4在DAGÇ 任务5在DAG甲在DAG依赖于任务6 d ... 任务2在DAG B是依赖于任务7在DAGë 任务4在DAG B在DAG依赖于任务8 F .. 在检查UI中

    0热度

    2回答

    如何配置Airflow,以便DAG中的任何故障将(立即)导致松弛消息? 此时此刻我通过创建一个slack_failed_task对其进行管理: slack_failed_task = SlackAPIPostOperator( task_id='slack_failed', channel="#datalabs", trigger_rule='one_failed',

    2热度

    1回答

    我的想法是有一个任务foo,它生成输入列表(用户,报告,日志文件等),并为输入列表中的每个元素启动任务。目标是利用Airflow的重试和其他逻辑,而不是重新实现它。 所以,理想情况下,我应该DAG看起来是这样的: 这里唯一的变量是生成的任务数。在完成所有这些任务之后,我想做更多的任务,因此为每项任务启动新的DAG似乎并不合适。 这是我的代码: default_args = { 'owne

    2热度

    1回答

    我有三个运营商的一个简单的DAG。第一个是PythonOperator与我们自己的功能,另外两个是标准的运营商从airflow.contrib(FileToGoogleCloudStorageOperator和GoogleCloudStorageToBigQueryOperator要准确)。他们按顺序工作。我们的自定义任务会生成许多文件,通常在2到5之间,具体取决于参数。所有这些文件都必须由后续任

    0热度

    1回答

    我不明白我需要运行哪些命令才能获得DAG预定。假设我使用airflow test dag_name task_id_1 2017-06-22测试了DAG,第二项任务使用了airflow test dag_name task_id_2 2017-06-22。 我跑airflow trigger_dag dag_name,但那是为了实例化DAG恰好那一刻吗? 比方说,我想dag_name的定时/调度的

    2热度

    2回答

    我想在不与Airflow GUI交互的情况下创建S3连接。有没有可能通过airflow.cfg或命令行? 我们正在使用AWS的作用,下面的连接参数为我们工作: { “aws_account_id”: “XXXX”, “role_arn”: “YYYYY”} 所以,手动创建的GUI为S3连接工作,现在我们希望自动执行此流程,并希望将其添加为Airflow部署流程的一部分。任何工作?

    1热度

    1回答

    我是新来的气流和意外启动的守护程序模式下的气流调度程序。现在,我想杀死调度器并可能重新启动它。我试着做 sudo kill -9 <list of pids> pkill <name> 什么都没发生。当我运行 ps aux | grep 'airflow scheduler' 我看到这些项: user1 2907 6.0 1.0 329788 62996 ? Sl 17:37

    3热度

    1回答

    我想用docker和rabbitMQ来建立我的气流。我正在使用rabbitmq:3管理映像。我可以访问rabbitMQ UI和API。 在气流中我建立气流webserver,气流调度程序,气流工作者和气流花。 Airflow.cfg文件用于配置气流。 当我使用broker_url = amqp://user:[email protected]:5672/和celery_result_backend