在您通过HQL脚本传递$ {xxx} vars并将其预处理以将其转换为{{xxx}} Jinja样式在实际进行模板渲染的阶段之前,然后用用户提供的字典中的值替换它们。我相信这是因为没有在HiveOperator类这样的功能:
def prepare_template(self):
if self.hiveconf_jinja_translate:
self.hql = re.sub(
"(\$\{([ a-zA-Z0-9_]*)\})", "{{ \g<2> }}", self.hql)
if self.script_begin_tag and self.script_begin_tag in self.hql:
self.hql = "\n".join(self.hql.split(self.script_begin_tag)[1:])
问题是我无法弄清楚如何触发这段代码的模板渲染阶段之前被调用。我有一个基本的DAG这样的脚本:
from airflow import DAG
from airflow.operators.hive_operator import HiveOperator
from datetime import datetime, timedelta
default_args = dict(
owner='mpetronic',
depends_on_past=False,
start_date=datetime(2017, 5, 2),
verbose=True,
retries=1,
retry_delay=timedelta(minutes=5)
)
dag = DAG(
dag_id='report',
schedule_interval='* * * * *',
user_defined_macros=dict(a=1, b=2),
default_args=default_args)
hql = open('/home/mpetronic/repos/airflow/resources/hql/report.hql').read()
task = HiveOperator(
task_id='report_builder',
hive_cli_conn_id='hive_dv',
schema='default',
mapred_job_name='report_builder',
hiveconf_jinja_translate=True,
dag=dag,
hql=hql)
我可以看到我的user_defined_macros字典,使得它那里得到与随后被应用到我的HQL脚本以使其作为一个全球性的神社语境词典合并代码一个模板。但是,因为我的HQL是原生HQL,所以我想要更新的所有变量都是以$ {xxx}的形式出现的,而jinja只是跳过它们。我需要气流来首先调用prepare_template(),但只是没有看到如何做到这一点。
我意识到我可以手动将我的HQL $ {xxx}更改为{{xxx}},因为这有效,但看起来像是反模式。我希望脚本能够本地或通过气流工作。这是TaskInstance类中的函数,它确实呈现了我手动更改的{{xxx}}值:
def render_templates(self):
task = self.task
jinja_context = self.get_template_context()
if hasattr(self, 'task') and hasattr(self.task, 'dag'):
if self.task.dag.user_defined_macros:
jinja_context.update(
self.task.dag.user_defined_macros)
rt = self.task.render_template # shortcut to method
for attr in task.__class__.template_fields:
content = getattr(task, attr)
if content:
rendered_content = rt(attr, content, jinja_context)
setattr(task, attr, rendered_content)
能够在airflow 1.8中获得上述正常工作状态,hql文件中的$ {xxx}表单变量被替换为user_defined_macros中的相应条目。你可能试着在代码的abover阶段添加一个记录器吗?在我的情况下,能够看到相应替换的调用。 –