2
A
回答
2
您可以在运行应用程序之前添加退出条件。 例如
public void testMapOperator() throws Exception
{
LocalMode lma = LocalMode.newInstance();
DAG dag = lma.getDAG();
NumberGenerator numGen = dag.addOperator("numGen", new NumberGenerator());
FunctionOperator.MapFunctionOperator<Integer, Integer> mapper
= dag.addOperator("mapper", new FunctionOperator.MapFunctionOperator<Integer, Integer>(new Square()));
ResultCollector collector = dag.addOperator("collector", new ResultCollector());
dag.addStream("raw numbers", numGen.output, mapper.input);
dag.addStream("mapped results", mapper.output, collector.input);
// Create local cluster
LocalMode.Controller lc = lma.getController();
lc.setHeartbeatMonitoringEnabled(false);
//Condition to exit the application
((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
return TupleCount == NumTuples;
}
});
lc.run();
Assert.assertEquals(sum, 285);
}
4
什么是您的使用情况?本地支持批处理是在路线图上,目前正在开展工作。或者,直到那时,一旦确定处理完成,输入操作员可以发送ShutdownException()信号,并通过DAG传播并关闭DAG。
让我们知道您是否需要进一步的细节。
+0
我正在写几乎所有开源bigdata处理引擎的比较作为我的msc论文。我想创建一个顶点批处理部分(与Mapreduce,Flink和Spark一致)。我现在可能会跳过它,继续进行流比较。 – Krever
+0
当然。使用它的方式是:在你的endWindow()调用中,检查你的任务是否完成 - 需要一些自定义逻辑。如果你的任务已经完成,调用ShuddownException()并且你的整个管道将关闭。 –
相关问题
- 1. 如何在MSBuild中进行批处理?
- 2. 如何使用批处理
- 3. 如何使用批处理
- 4. 如何使用批处理
- 5. 如何使用批处理
- 6. 如何使用批处理
- 7. 如何使用批处理
- 8. 如何使用DynamoDBContext对结果进行批处理
- 9. Tensorflow-如何使用MNIST数据集进行全批处理?
- 10. 如何运行批处理?
- 11. 如何运行批处理批
- 12. 使用批处理程序进行地理编码
- 13. 如何从批处理脚本中运行批处理脚本?
- 14. 使用批处理多进程
- 15. Apex批处理不执行所有记录
- 16. 批处理类Apex中的执行混乱
- 17. 使用进程和运行时类执行批处理文件
- 18. 使用批处理
- 19. 使用批处理
- 20. 使用批处理
- 21. 使用批处理
- 22. 使用批处理
- 23. 使用批处理
- 24. 使用批处理
- 25. 使用批处理
- 26. 用参数执行批处理使用进程
- 27. 如何使用Hibernate批处理
- 28. 如何使用Windows批处理
- 29. 如何使用webgl实现批处理?
- 30. 如何使用tf.cond批处理
在运行环境方面,一些更一般的解决方案如何?我想有可能选择是本地还是集群环境。 – Krever