我需要创建恢复模式。 在我的模式中,我只能在给定的时间窗口上启动作业。 如果作业失败,它只会在下一个时间窗口重新启动,并且完成后我想要启动为此窗口提前计划的计划作业。 作业之间唯一的不同是时间窗口参数。春季批次:重新启动作业,然后自动启动下一个作业
我想到JobExecutionDecider与JobExplorer结合或重写Joblauncher。但是,一切似乎太侵入。
我没有找到符合我需求的例子任何想法都将是最受欢迎的。
我需要创建恢复模式。 在我的模式中,我只能在给定的时间窗口上启动作业。 如果作业失败,它只会在下一个时间窗口重新启动,并且完成后我想要启动为此窗口提前计划的计划作业。 作业之间唯一的不同是时间窗口参数。春季批次:重新启动作业,然后自动启动下一个作业
我想到JobExecutionDecider与JobExplorer结合或重写Joblauncher。但是,一切似乎太侵入。
我没有找到符合我需求的例子任何想法都将是最受欢迎的。
只是为了回顾一下基于由incomplete-co.de提供的建议实际上是完成的。 我创建了一个类似于下面的恢复流程。恢复流程包装了我的实际批次,仅负责将正确的作业参数提供给内部作业。它可以是第一次执行时的初始参数,正常执行时的新参数或最后一次执行失败时的旧参数。
<batch:job id="recoveryWrapper"
incrementer="wrapperRunIdIncrementer"
restartable="true">
<batch:decision id="recoveryFlowDecision" decider="recoveryFlowDecider">
<batch:next on="FIRST_RUN" to="defineParametersOnFirstRun" />
<batch:next on="RECOVER" to="recover.batchJob " />
<batch:next on="CURRENT" to="current.batchJob " />
</batch:decision>
<batch:step id="defineParametersOnFirstRun" next="current.batchJob">
<batch:tasklet ref="defineParametersOnFirstRunTasklet"/>
</batch:step>
<batch:step id="recover.batchJob " next="current.batchJob">
<batch:job ref="batchJob" job-launcher="jobLauncher"
job-parameters-extractor="jobParametersExtractor" />
</batch:step>
<batch:step id="current.batchJob" >
<batch:job ref="batchJob" job-launcher="jobLauncher"
job-parameters-extractor="jobParametersExtractor" />
</batch:step>
</batch:job>
解决方案的心脏是RecoveryFlowDecider的JobParametersExtractor同时使用春天批量重启机制。 RecoveryFlowDecider将查询JobExplorer和JobRepository以了解我们是否在上次运行中发生故障。它将把最后一次执行放在包装器的执行上下文中,以便稍后在JobParametersExtractor中使用。 请注意使用runIdIncremeter允许重新执行包装作业。
@Component
public class RecoveryFlowDecider implements JobExecutionDecider {
private static final String FIRST_RUN = "FIRST_RUN";
private static final String CURRENT = "CURRENT";
private static final String RECOVER = "RECOVER";
@Autowired
private JobExplorer jobExplorer;
@Autowired
private JobRepository jobRepository;
@Override
public FlowExecutionStatus decide(JobExecution jobExecution
,StepExecution stepExecution) {
// the wrapper is named as the wrapped job + WRAPPER
String wrapperJobName = jobExecution.getJobInstance().getJobName();
String jobName;
jobName = wrapperJobName.substring(0,wrapperJobName.indexOf(EtlConstants.WRAPPER));
List<JobInstance> instances = jobExplorer.getJobInstances(jobName, 0, 1);
JobInstance internalJobInstance = instances.size() > 0 ? instances.get(0) : null;
if (null == internalJobInstance) {
return new FlowExecutionStatus(FIRST_RUN);
}
JobExecution lastExecution = jobRepository.getLastJobExecution(internalJobInstance.getJobName()
,internalJobInstance.getJobParameters());
//place the last execution on the context (wrapper context to use later)
jobExecution.getExecutionContext().put(EtlConstants.LAST_EXECUTION, lastExecution);
ExitStatus exitStatus = lastExecution.getExitStatus();
if (ExitStatus.FAILED.equals(exitStatus) || ExitStatus.UNKNOWN.equals(exitStatus)) {
return new FlowExecutionStatus(RECOVER);
}else if(ExitStatus.COMPLETED.equals(exitStatus)){
return new FlowExecutionStatus(CURRENT);
}
//We should never get here unless we have a defect
throw new RuntimeException("Unexpecded batch status: "+exitStatus+" in decider!");
}
}
然后JobParametersExtractor将最后执行的结果再次测试,在失败作业的情况下,它将成为用于执行失败的作业触发春季Bacth重启机制的原始参数。否则,它将创建一组新的参数并在正常过程中执行。
@Component
public class JobExecutionWindowParametersExtractor implements
JobParametersExtractor {
@Override
public JobParameters getJobParameters(Job job, StepExecution stepExecution) {
// Read the last execution from the wrapping job
// in order to build Next Execution Window
JobExecution lastExecution= (JobExecution) stepExecution.getJobExecution().getExecutionContext().get(EtlConstants.LAST_EXECUTION);;
if(null!=lastExecution){
if (ExitStatus.FAILED.equals(lastExecution.getExitStatus())) {
JobInstance instance = lastExecution.getJobInstance();
JobParameters parameters = instance.getJobParameters();
return parameters;
}
}
//We do not have failed execution or have no execution at all we need to create a new execution window
return buildJobParamaters(lastExecution,stepExecution);
}
...
}
是否有可能以相反的方式做到这一点?
在每个时间窗口中,提交该时间窗口预定的作业。
但是,作业的第一步应检查前一时间窗口中的作业是否成功完成。如果之前失败了,那么提交以前的工作,并等待完成,然后再进入自己的逻辑。
你有没有考虑过JobStep?也就是说,一个步骤确定是否有任何其他工作要运行。这个值被设置到StepExecutionContext中。一个JobExecutionDecider然后检查这个值;如果存在,则指向启动作业的JobStep。
这里就可以了DOC http://docs.spring.io/spring-batch/reference/htmlsingle/#external-flows
感谢: JobSteps听起来是个好主意,我已经写了jobParametersExtractor提供我与预期的参数(基于故障运行和新的运行)。 现在我想在外部工作中整合基于决策者的两个工具。 听起来合乎逻辑吗? –
是的 - 决定者可以决定是否采取JobStep或者不采用JobStep,真正打开你以编程方式表达流程逻辑。记住你可以通过在构造函数中传入你喜欢的任何字符串来创建自己的FlowExecutionStatus,它将成为XML路由可用的值。 –