2017-05-04 153 views
1

在您通过HQL脚本传递$ {xxx} vars并将其预处理以将其转换为{{xxx}} Jinja样式在实际进行模板渲染的阶段之前,然后用用户提供的字典中的值替换它们。我相信这是因为没有在HiveOperator类这样的功能:

def prepare_template(self): 
    if self.hiveconf_jinja_translate: 
     self.hql = re.sub(
      "(\$\{([ a-zA-Z0-9_]*)\})", "{{ \g<2> }}", self.hql) 
    if self.script_begin_tag and self.script_begin_tag in self.hql: 
     self.hql = "\n".join(self.hql.split(self.script_begin_tag)[1:]) 

问题是我无法弄清楚如何触发这段代码的模板渲染阶段之前被调用。我有一个基本的DAG这样的脚本:

from airflow import DAG 
from airflow.operators.hive_operator import HiveOperator 
from datetime import datetime, timedelta 

default_args = dict(
    owner='mpetronic', 
    depends_on_past=False, 
    start_date=datetime(2017, 5, 2), 
    verbose=True, 
    retries=1, 
    retry_delay=timedelta(minutes=5) 
    ) 

dag = DAG(
    dag_id='report', 
    schedule_interval='* * * * *', 
    user_defined_macros=dict(a=1, b=2), 
    default_args=default_args) 

hql = open('/home/mpetronic/repos/airflow/resources/hql/report.hql').read() 

task = HiveOperator(
    task_id='report_builder', 
    hive_cli_conn_id='hive_dv', 
    schema='default', 
    mapred_job_name='report_builder', 
    hiveconf_jinja_translate=True, 
    dag=dag, 
    hql=hql) 

我可以看到我的user_defined_macros字典,使得它那里得到与随后被应用到我的HQL脚本以使其作为一个全球性的神社语境词典合并代码一个模板。但是,因为我的HQL是原生HQL,所以我想要更新的所有变量都是以$ {xxx}的形式出现的,而jinja只是跳过它们。我需要气流来首先调用prepare_template(),但只是没有看到如何做到这一点。

我意识到我可以手动将我的HQL $ {xxx}更改为{{xxx}},因为这有效,但看起来像是反模式。我希望脚本能够本地或通过气流工作。这是TaskInstance类中的函数,它确实呈现了我手动更改的{{xxx}}值:

def render_templates(self): 
    task = self.task 
    jinja_context = self.get_template_context() 
    if hasattr(self, 'task') and hasattr(self.task, 'dag'): 
     if self.task.dag.user_defined_macros: 
      jinja_context.update(
       self.task.dag.user_defined_macros) 

    rt = self.task.render_template # shortcut to method 
    for attr in task.__class__.template_fields: 
     content = getattr(task, attr) 
     if content: 
      rendered_content = rt(attr, content, jinja_context) 
      setattr(task, attr, rendered_content) 
+0

能够在airflow 1.8中获得上述正常工作状态,hql文件中的$ {xxx}表单变量被替换为user_defined_macros中的相应条目。你可能试着在代码的abover阶段添加一个记录器吗?在我的情况下,能够看到相应替换的调用。 –

回答

0

我想出了我的问题。这是说明方法使用正则表达式:

(\$\{([ a-zA-Z0-9_]*)\}) 

它不占的形式直线变量:

${hivevar:var_name} 

它不考虑在模式冒号。这种形式是使用直线在命名空间中定义Hive变量的更为标准的方式。为了使这个神社替代的工作,你必须引用仅使用${var_name}在HQL变量,但你只能使用定义在直线变量:

set hivevar:var_name=123; 

我认为,气流应全力支持hivevar:var_name风格命名空间的变量,当你直线运行beeline是Hive使用的首选客户端。