apache-airflow

    2热度

    3回答

    我需要引用由BashOperator返回的变量。我可能做错了,请原谅我。在我的task_archive_s3_file中,我需要从get_s3_file获取文件名。该任务仅将{{ ti.xcom_pull(task_ids=submit_file_to_spark) }}作为字符串打印而不是值。 如果我使用bash_command,则打印值正确。 get_s3_file = PythonOpera

    0热度

    1回答

    我试图找出Airflow如何在多租户环境中工作。具体来说,要求应该如下所示: TeamA和TeamB两个团队正在使用单个Airflow实例。 团队的A和B每个都有自己的服务用户帐户:serviceUserA和ServiceUserB他们应该运行他们的工作。 出于安全原因,团队A不应该能够创建在ServiceUserB下运行的作业,反之亦然。 在这一点上我不清楚需求3)是否可以用Airflow来满足

    1热度

    2回答

    我想触发simplehttpoperator,像这样: 气流trigger_dag test_trigger --conf '{ “名”: “东西”}' 然后我用pythonoperator python_callable使用kwargs [ 'dag_run' 接受的参数] .conf,我想将['dag_run']。conf传递给simplehttpoperator,我该怎么做?任何人都可以帮忙

    1热度

    1回答

    我在使用LocalScheduler选项的EC2实例上使用气流。我已经调用airflow scheduler和airflow webserver,一切似乎都运行良好。也就是说,在将cron字符串提供给schedule_interval用于“每10分钟执行一次”'*/10 * * * *'后,该作业默认每24小时继续执行一次。下面的代码头: from datetime import datetime

    3热度

    1回答

    我已经搜索了很多链接,但没有找到解决我遇到的问题的任何方法。我已经看到了将密钥/ var传递给airflow UI的选项,但是最终用户使用哪个密钥与哪个dag关联是非常令人困惑的。有没有什么办法可以实现如下功能: While running an airflow job, end user will be asked for values to some parameters and after

    2热度

    2回答

    使用最新版本的apache airflow。从LocalExecutor开始,在该模式下,一切工作正常,除了Web UI状态需要使用CeleryExecutor的一些交互。使用Redis安装和配置Celery执行程序,将Redis配置为代理程序URL和结果后端。 它出现在第一个工作,直到任务计划此时它提供了以下错误: File "/bin/airflow", line 28, in <module

    0热度

    1回答

    是否有任何直接的方法可以将shell脚本运行到dataproc集群中。目前我可以通过pysparkoperator(它调用aonther python文件,然后这个python文件调用shell脚本)运行shell。我搜查了很多链接,但至今没有找到任何直接的方式。 如果有人能告诉我最简单的方法,对我来说真的很有帮助。与sh运营商[1]

    1热度

    1回答

    我试图用气流脚本来运行存在于云存储HQL文件,有两个参数,通过它我们可以通过DataprocHiveOperator路径: 查询: 'GS://bucketpath/filename.q' Error occuring - cannot recognize input near 'gs' ':' '/' query_uri: 'GS://bucketpath/filename.q' Error o

    0热度

    1回答

    试图促成气流,但不能在Ubuntu 16.0.4上运行并运行。一些事情正在与kerbos进行。 https://github.com/apache/incubator-airflow py34-hdp-airflow_backend_postgres runtests: commands[2] | sudo /home/dalupus/incubator-airflow/scripts/ci/se

    0热度

    1回答

    我有一个Airflow操作员在第三方服务上踢任务,然后监视该作业的进度。在代码中,如果气流工人重新启动(通常是由于代码部署)的执行貌似 def execute(self, context): external_id = start_external_job() wait_until_external_job_completes(external_id) 这个任务的一个实例运