2013-03-11 87 views
4

我正在尝试创建Ooize协调器。问题是我已经有数据等待使用oozie进行处理。Oozie协调员。如何将过去的数据提供给mapreduce工作?

想象一下这样的情况。

  1. 当前日期是:2013年3月1日(2013年3月1日)

  2. 我确实有这些输入目录:

    /分期/着陆/源/ xvlr/2013/02/01/00(2013年2月第一个月,每天的第一个小时) /staging/landing/source/xvlr/2013/02/01/01

    /staging/landing/source/xvlr/2013/02/01/02

    /分段/降落/源极/ xvlr/2013/02/01/03

    /分段/降落/源极/ xvlr/2013/02/1月4日

    ....

    /分期/降落/源极/ xvlr/2013/02/28/00

    ...

    /分段/降落/源极/ xvlr/2013/02 /23分之28

我希望我的Oozie的协调,以消耗所有先前创建的登陆数据 和产生这样的输出:

/masterdata/source/xvlr/2013/02/01/00 
/masterdata/source/xvlr/2013/02/01/01 
/masterdata/source/xvlr/2013/02/01/02 
/masterdata/source/xvlr/2013/02/01/03 
/masterdata/source/xvlr/2013/02/01/04 
.... 
/masterdata/source/xvlr/2013/02/28/00 
... 
/masterdata/source/xvlr/2013/02/28/23 

然后我想我的协调运行每个小时,产生新的输出masterdata。

如何使用协调器规范来实现? 这是我的协调员。它什么也没做。它确实达到我需要的时间,然后等待。它没有开始工作。

请指教。

<coordinator-app name="Xvlr-parser-coordinator" frequency="60" 
       start="2013-03-07T05:35Z" end="2113-01-01T00:35Z" timezone="Europe/Moscow" xmlns="uri:oozie:coordinator:0.3"> 
    <controls> 
     <timeout>5</timeout> 
     <concurrency>4</concurrency> 
    </controls> 

    <datasets> 
     <dataset name="xvlrInputDataset" frequency="${coord:hours(1)}" initial-instance="2013-03-07T05:35Z" timezone="Europe/Moscow"> 
      <uri-template>${nameNode}/staging/landing/source/xvlr/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template> 
      <done-flag></done-flag> 
     </dataset> 
     <dataset name="xvlrOutputDataset" frequency="${coord:hours(1)}" initial-instance="2013-03-07T05:35Z" timezone="Europe/Moscow"> 
      <uri-template>${nameNode}/masterdata/source/xvlr/archive/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template> 
      <done-flag></done-flag> 
     </dataset> 

    </datasets> 

    <input-events> 
     <data-in name="xvlrInputEvent" dataset="xvlrInputDataset"> 
      <instance>${coord:current(0)}</instance> 
     </data-in> 
    </input-events> 

    <output-events> 
     <data-out name="xvlrOutputEvent" dataset="xvlrOutputDataset"> 
      <instance>${coord:current(0)}</instance> 
     </data-out> 
    </output-events> 
    <action> 
     <workflow> 
      <app-path>${oozieAppHomeCatalog}/sub-workflows/Xvlr-parser-subworkflow.xml</app-path> 
      <configuration> 
       <property> 
        <name>inputDir</name> 
        <value>${coord:dataIn('xvlrInputEvent')}</value> 
       </property> 
       <property> 
        <name>outputDir</name> 
        <value>${coord:dataOut('xvlrOutputEvent')}</value> 
       </property> 

      </configuration> 

     </workflow> 
    </action> 
</coordinator-app> 

回答

4

这里是正确的解决方案(它的工作好几天了:)))):

<coordinator-app name="Xvlr-parser-coordinator" frequency="60" 
       start="2013-03-07T16:35Z" end="2113-01-01T00:35Z" timezone="Europe/Moscow" xmlns="uri:oozie:coordinator:0.3"> 
    <controls> 
     <timeout>3</timeout> 
     <concurrency>1</concurrency> 
    </controls> 

    <datasets> 
     <dataset name="xvlrInputDataset" frequency="${coord:hours(1)}" initial-instance="2013-03-07T16:35Z" timezone="Europe/Moscow"> 
      <uri-template>${nameNode}/staging/landing/source/xvlr/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template> 
      <done-flag></done-flag> 
     </dataset> 
     <dataset name="xvlrOutputDataset" frequency="${coord:hours(1)}" initial-instance="2013-03-07T16:35Z" timezone="Europe/Moscow"> 
      <uri-template>${nameNode}/masterdata/source/xvlr/archive/${YEAR}/${MONTH}/${DAY}/${HOUR}</uri-template> 
      <done-flag></done-flag> 
     </dataset> 

    </datasets> 

    <input-events> 
     <data-in name="xvlrInputEvent" dataset="xvlrInputDataset"> 
      <instance>${coord:current(0)}</instance> 
     </data-in> 
    </input-events> 

    <output-events> 
     <data-out name="xvlrOutputEvent" dataset="xvlrOutputDataset"> 
      <instance>${coord:current(0)}</instance> 
     </data-out> 
    </output-events> 
    <action> 
     <workflow> 
      <app-path>${oozieAppHomeCatalog}/sub-workflows/Xvlr-parser-subworkflow.xml</app-path> 
      <configuration> 
       <property> 
        <name>inputDir</name> 
        <value>${coord:dataIn('xvlrInputEvent')}</value> 
       </property> 
       <property> 
        <name>outputDir</name> 
        <value>${coord:dataOut('xvlrOutputEvent')}</value> 
       </property> 
      </configuration> 

     </workflow> 
    </action> 
</coordinator-app> 

它有什么作用?

  • 起初它并开始2013-03-07T16:35Z,所以所有以前
    收集的数据已通过底层工作流传递(一个Mr工作 调用与分析功能)
    • 在处理“过去时间数据集”(数据集时间小于当前时间)时,工作流逐一运行:它确实消耗了 /pastdate/hour_00,然后立即开始消耗 /pastdate/hour_01等
    • 当协调员到达现在的时间,它开始每小时调用一次工作流程(按照设计:05:35,06:35 ... 23:35)。
    • 查看超时声明:我确实有缺失的数据集:例如3月第一个10小时没有数据。 工作流确实等了3分钟,然后死亡。

问题已解决。