2014-10-03 55 views
1

我在Oozie中实现自定义异步操作时遇到问题。我的类从ActionExecutor扩展,并覆盖方法initActionType,start,end,check,kill和isCompleted。Oozie自定义异步操作

在启动方法中,我想启动一个YARN作业,这是通过我的BiohadoopClient类实现的。要拨打电话异步的,我包裹在一个可调用的client.run()方法:

public void start(final Context context, final WorkflowAction action) { 
... 
    Callable<String> biohadoop = new Callable<String>() { 
    BiohadoopClient client = new BiohadoopClient(); 
    client.run(); 
    } 

    // submit callable to executor 
    executor.submit(biohadoop); 

    // set the start data, according to https://oozie.apache.org/docs/4.0.1/DG_CustomActionExecutor.html 
    context.setStartData(externalId, callBackUrl, callBackUrl); 
... 
} 

这工作得很好,例如,当我用我的自定义操作在一个叉/加入的方式,对执行动作并行运行。

现在,问题在于,Oozie对于此操作仍处于RUNNING状态。似乎不可能将其改为已完成的状态。 Oozie永远不会调用check()方法,对于end()方法也是如此。它无助于在Callable(在client.run()完成之后)手动设置context.setExternalStatus(),context.setExecutionData()和context.setEndData()。我也尝试手动排队ActionEndXCommand,但没有运气。

当我在Callable的start()方法中等待完成时,状态得到正确更新,但fork/join中的执行不再平行(这似乎是逻辑,因为执行等待Callable完成)。

How external clients notify Oozie workflow with HTTP callback没有帮助,因为使用回调似乎没有任何变化(嗯,我可以看到它发生在日志文件中,但除此之外,什么都没有......)。此外,答案中提到,SSH操作异步运行,但我还没有发现这是如何完成的。 Callable中有一些包装,但最后,Callable的call()方法被直接调用(不提交给Executor)。

到目前为止,我还没有找到任何示例如何编写异步自定义操作。任何人都可以帮我吗?

感谢

编辑

这里是initActionType的实现(),启动(),检查(),()结束时,可调用的实现可以开始()动作中找到。

将可调用对象提交给start()操作中的执行程序,之后调用其shutdown()方法 - 以便执行程序在Callable完成后关闭。下一步,调用context.setStartData(externalId,callBackUrl,callBackUrl)。

private final AtomicBoolean finished = new AtomicBoolean(false); 

public void initActionType() { 
    super.initActionType(); 
    log.info("initActionType() invoked"); 
} 

public void start(final Context context, final WorkflowAction action) 
     throws ActionExecutorException { 
    log.info("start() invoked"); 

    // Get parameters from Node configuration 
    final String parameter = getParameters(action.getConf()); 

    Callable<String> biohadoop = new Callable<String>() { 
     @Override 
     public String call() throws Exception { 
      log.info("Starting Biohadoop"); 

      // No difference if check() is called manually 
      // or if the next line is commented out 
      check(context, action); 

      BiohadoopClient client = new BiohadoopClient(); 
      client.run(parameter); 
      log.info("Biohadoop finished");    

      finished.set(true); 
      // No difference if check() is called manually 
      // or if the next line is commented out 
      check(context, action); 

      return null; 
     } 
    }; 

    ExecutorService executor = Executors.newCachedThreadPool(); 
    biohadoopResult = executor.submit(biohadoop); 
    executor.shutdown(); 

    String externalId = action.getId(); 
    String callBackUrl = context.getCallbackUrl("finished"); 
    context.setStartData(externalId, callBackUrl, callBackUrl); 
} 

public void check(final Context context, final WorkflowAction action) 
     throws ActionExecutorException { 
    // finished is an AtomicBoolean, that is set to true, 
    // after Biohadoop has finished (see implementation of Callable) 
    if (finished.get()) { 
     log.info("check(Context, WorkflowAction) invoked - 
      Callable has finished"); 
     context.setExternalStatus(Status.OK.toString()); 
     context.setExecutionData(Status.OK.toString(), null); 
    } else { 
     log.info("check(Context, WorkflowAction) invoked"); 
     context.setExternalStatus(Status.RUNNING.toString()); 
    } 
} 

public void end(Context context, WorkflowAction action) 
     throws ActionExecutorException { 
    log.info("end(Context, WorkflowAction) invoked"); 
    context.setEndData(Status.OK, Status.OK.toString()); 
} 
+0

您能说明如何实现check()和initActionType()方法,以及如何在Callable中实现call()方法? – 2014-10-03 19:25:22

+0

@SSaikia_JtheRocker:我添加了实现 – gappc 2014-10-03 21:37:00

回答

0

一两件事 - 我可以看到你关闭你已经提交作业后立即执行者 - executor.shutdown();。这可能会导致问题。你能否试着将这个陈述改为end()方法?

+0

感谢您的想法。我试过了,但没有区别。JavaDoc非常清楚如何使用shutdown:'启动一个有序的关闭,其中执行先前提交的任务,但不会接受任何新任务。如果已关闭,调用没有其他影响。 此方法不会等待先前提交的任务完成执行。# 也许您有其他想法? – gappc 2014-10-09 14:29:19

+0

@gappc:我怀疑AtomicBoolean变量是不是更新了什么。请从start()方法中删除检查方法语句,然后检查您在check()中执行的日志消息是否可见。另外,记录finished.get()的值。如果您可以使用JUnit测试用例进行测试,情况会好很多。 – 2014-10-10 10:15:18

+0

AtomicBoolean设置正确,我可以从日志文件中看到,如果我手动调用check()(不同的日志输出)。如果我删除手动检查()调用,check()和end()根本不会被调用。对于JUnit测试的情况:你是对的:)但是在目前的阶段,所需的解决方案根本无法工作,额外的努力并不值得 - 至少在我看来。我从oozie邮件列表中得到了一些答案,我现在正在尝试,我会告诉你关于进度的信息 – gappc 2014-10-11 13:39:17

0

最后,我没有找到问题的“真正”解决方案。为我工作的解决方案是实现一个动作,它使用Java Executor框架并行调用Biohadoop实例。在调用之后,我等待(仍然在动作中)线程完成