2016-11-28 62 views
2

如何使用Apache Apex创建批处理应用程序?如何使用Apex进行批处理?

我发现的所有示例都是流式应用程序,这意味着它们不会结束,我希望我的应用程序在处理完所有数据后关闭它。

谢谢

回答

2

您可以在运行应用程序之前添加退出条件。 例如

public void testMapOperator() throws Exception 
{ 
    LocalMode lma = LocalMode.newInstance(); 
    DAG dag = lma.getDAG(); 

    NumberGenerator numGen = dag.addOperator("numGen", new NumberGenerator()); 
    FunctionOperator.MapFunctionOperator<Integer, Integer> mapper 
    = dag.addOperator("mapper", new FunctionOperator.MapFunctionOperator<Integer, Integer>(new Square())); 
    ResultCollector collector = dag.addOperator("collector", new ResultCollector()); 

    dag.addStream("raw numbers", numGen.output, mapper.input); 
    dag.addStream("mapped results", mapper.output, collector.input); 

// Create local cluster 
    LocalMode.Controller lc = lma.getController(); 
    lc.setHeartbeatMonitoringEnabled(false); 

//Condition to exit the application 
    ((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>() 
    { 
    @Override 
    public Boolean call() throws Exception 
    { 
     return TupleCount == NumTuples; 
    } 
    }); 

    lc.run(); 

    Assert.assertEquals(sum, 285); 
} 

完整的代码参照https://github.com/apache/apex-malhar/blob/master/stream/src/test/java/org/apache/apex/malhar/stream/FunctionOperator/FunctionOperatorTest.java

+0

在运行环境方面,一些更一般的解决方案如何?我想有可能选择是本地还是集群环境。 – Krever

4

什么是您的使用情况?本地支持批处理是在路线图上,目前正在开展工作。或者,直到那时,一旦确定处理完成,输入操作员可以发送ShutdownException()信号,并通过DAG传播并关闭DAG。

让我们知道您是否需要进一步的细节。

+0

我正在写几乎所有开源bigdata处理引擎的比较作为我的msc论文。我想创建一个顶点批处理部分(与Mapreduce,Flink和Spark一致)。我现在可能会跳过它,继续进行流比较。 – Krever

+0

当然。使用它的方式是:在你的endWindow()调用中,检查你的任务是否完成 - 需要一些自定义逻辑。如果你的任务已经完成,调用ShuddownException()并且你的整个管道将关闭。 –