2016-05-04 93 views
1

我正在研究Spring批处理应用程序以使用java配置执行两个批处理作业。最近我添加了一个Spring调度程序来安排我写的一个Job。侦听器在作业完成时第一次被调用,但在下次执行后不会被调用。以下是我的工作配置代码:Spring批处理调度程序:作业监听程序仅在作业第一次运行时工作

@Configuration 
@EnableBatchProcessing 
public class BatchConfiguration{ 

    @Autowired 
    public JobBuilderFactory jobBuilderFactory; 

    @Autowired 
    public StepBuilderFactory stepBuilderFactory; 

    @Autowired 
    public MongoTemplate mongoTemplate; 

    @Autowired 
    UnitsRepository unitsRepos; 

    @Autowired 
    UserRepository userRepository; 

    @Autowired 
    ElectraService electraService; 

    /*@Autowired InfrastructureConfiguration infrastructureConfiguration;*/ 

    // tag::readerwriterprocessor[] 
    @Bean 
    @StepScope 
    public MongoItemReader<UserBean> reader() { 
     MongoItemReader<UserBean> reader = new MongoItemReader<UserBean>(); 
     reader.setTemplate(mongoTemplate); 
     reader.setCollection("user"); 
     reader.setQuery("{ '_id': 'U3'}"); 
     reader.setSort(new HashMap<String,Direction>(){{put("_id", Direction.ASC);}}); 
     reader.setTargetType(UserBean.class); 
     return reader; 
    } 

    @Bean 
    public ExceedUsageProcessor processor() { 
     return new ExceedUsageProcessor(unitsRepos,electraService); 
    } 

    @Bean 
    public AnomalyProcessor anomalyProcessor() { 
     return new AnomalyProcessor(unitsRepos); 
    } 
    @Bean 
    @StepScope 
    public MongoItemWriter<DayByDayUsage> writer() { 
     MongoItemWriter<DayByDayUsage> writer = new MongoItemWriter<DayByDayUsage>(); 
     writer.setTemplate(mongoTemplate); 
     writer.setCollection("usage"); 
     return writer; 
    } 
    // end::readerwriterprocessor[] 

    // tag::listener[] 

    @Bean 
    @StepScope 
    public MongoItemWriter<AnomalyBean> anomalyWriter() { 
     MongoItemWriter<AnomalyBean> writer = new MongoItemWriter<AnomalyBean>(); 
     writer.setTemplate(mongoTemplate); 
     writer.setCollection("anomaly"); 
     return writer; 
    } 

    @Bean 
    public ExceedJobNotificationListener listener() { 
     return new ExceedJobNotificationListener(mongoTemplate); 
    } 

    @Bean 
    public AnomalyJobListener anomalyListener(){ 
     return new AnomalyJobListener(mongoTemplate,userRepository); 

    } 
    // end::listener[] 

    // tag::jobstep[] 
    @Bean 
    public Job notifyUserJob() { 
     return jobBuilderFactory.get("notifyUserJob") 
       .incrementer(new RunIdIncrementer()) 
       .listener(listener()) 
       .flow(step1()) 
       .end() 
       .build(); 
    } 

    @Bean 
    public Job anomalyJob() { 
     return jobBuilderFactory.get("anomalyJob") 
       .incrementer(new RunIdIncrementer()) 
       .listener(anomalyListener()) 
       .flow(step2()) 
       .end() 
       .build(); 
    } 
    @Bean 
    public Step step1() { 
     return stepBuilderFactory.get("step1") 
       .<UserBean, DayByDayUsage> chunk(50) 
       .reader(reader()) 
       .processor(processor()) 
       .writer(writer()) 
       .taskExecutor(taskExecutor()) 
       .throttleLimit(10) 
       .allowStartIfComplete(true) 
       .build(); 
    } 
    // end::jobstep[] 

    @Bean 
    public Step step2() { 
     return stepBuilderFactory.get("step2") 
       .<UserBean, AnomalyBean> chunk(50) 
       .reader(reader()) 
       .processor(anomalyProcessor()) 
       .writer(anomalyWriter()) 
       .taskExecutor(taskExecutor()) 
       .throttleLimit(10) 
       .allowStartIfComplete(true) 
       .build(); 
    } 

    @Bean 
    public TaskExecutor taskExecutor() { 
     // TODO Auto-generated method stub 
     ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); 
     taskExecutor.setMaxPoolSize(10); 
     taskExecutor.afterPropertiesSet(); 
     return taskExecutor; 
    } 

    @Bean 
    public DataSource dataSource() { 
     return new EmbeddedDatabaseBuilder() 
       .setType(EmbeddedDatabaseType.HSQL) 
       .addScript("classpath:/org/springframework/batch/core/schema-hsqldb.sql") 
       .build(); 
    } 
} 

而下面是我的计划代码:

@Component 
public class AnomalyScheduler { 
    private Job myImportJob; 
     private JobLauncher jobLauncher; 

     @Autowired 
     public AnomalyScheduler(JobLauncher jobLauncher, @Qualifier("anomalyJob") Job myImportJob){ 
      this.myImportJob = myImportJob; 
      this.jobLauncher = jobLauncher; 
     } 

     @Scheduled(fixedDelay=60000) 
     public void runJob(){ 
      try { 
      jobLauncher.run(myImportJob, new JobParameters()); 
     } catch (JobExecutionAlreadyRunningException | JobRestartException 
       | JobInstanceAlreadyCompleteException 
       | JobParametersInvalidException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
     } 
} 

我的听众如下:

public class AnomalyJobListener extends JobExecutionListenerSupport { 
    private PushNotification pushNotification = PushNotification 
      .getPushNotificationInstance(); 

    @Autowired 
    public AnomalyJobListener(MongoTemplate mongoTemplate, 
      UserRepository userRepository) { 
     List<AnomalyBean> anomalies = new ArrayList<AnomalyBean>(0); 
     anomalies = mongoTemplate.findAll(AnomalyBean.class); 
     int numAnomalies = anomalies.size(); 
     List<UserBean> admins = new ArrayList<UserBean>(0); 
     admins = userRepository.userByType("admin"); 
     if (numAnomalies > 0) { 
      for (UserBean admin : admins) { 
       pushNotification.pushNotification(numAnomalies 
         + " anomalies detected ! Keep an eye on that.", 
         admin.getDeveiceId()); 
      } 

     } 
    } 
} 

这里是控制台输出:

2016-05-04 08:17:39.565 INFO 9348 --- [   main] com.electra.Application     : Starting Application on all-PC with PID 9348 (F:\Electrck\ElectrackJobRepository\ElectrackJobs\bin started by all in F:\Electrck\ElectrackJobRepository\ElectrackJobs) 
    2016-05-04 08:17:39.571 INFO 9348 --- [   main] com.electra.Application     : No active profile set, falling back to default profiles: default 
    2016-05-04 08:17:39.681 INFO 9348 --- [   main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.spring[email protected]20b2475a: startup date [Wed May 04 08:17:39 IST 2016]; root of context hierarchy 
    2016-05-04 08:17:41.943 WARN 9348 --- [   main] o.s.c.a.ConfigurationClassEnhancer  : @Bean method ScopeConfiguration.stepScope is non-static and returns an object assignable to Spring's BeanFactoryPostProcessor interface. This will result in a failure to process annotations such as @Autowired, @Resource and @PostConstruct within the method's declaring @Configuration class. Add the 'static' modifier to this method to avoid these container lifecycle issues; see @Bean javadoc for complete details. 
    2016-05-04 08:17:41.966 WARN 9348 --- [   main] o.s.c.a.ConfigurationClassEnhancer  : @Bean method ScopeConfiguration.jobScope is non-static and returns an object assignable to Spring's BeanFactoryPostProcessor interface. This will result in a failure to process annotations such as @Autowired, @Resource and @PostConstruct within the method's declaring @Configuration class. Add the 'static' modifier to this method to avoid these container lifecycle issues; see @Bean javadoc for complete details. 
    2016-05-04 08:17:42.258 INFO 9348 --- [   main] o.s.j.d.e.EmbeddedDatabaseFactory  : Starting embedded database: url='jdbc:hsqldb:mem:testdb', username='sa' 
    2016-05-04 08:17:42.646 INFO 9348 --- [   main] o.s.jdbc.datasource.init.ScriptUtils  : Executing SQL script from class path resource [org/springframework/batch/core/schema-hsqldb.sql] 
    2016-05-04 08:17:42.658 INFO 9348 --- [   main] o.s.jdbc.datasource.init.ScriptUtils  : Executed SQL script from class path resource [org/springframework/batch/core/schema-hsqldb.sql] in 12 ms. 
    2016-05-04 08:18:01.793 INFO 9348 --- [   main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 
    2016-05-04 08:18:01.812 INFO 9348 --- [   main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'taskExecutor' 
    push status [ messageId=0:1462330092323002%9b3f4867f9fd7ecd ] 
    push status [ messageId=0:1462330095502779%9b3f4867f9fd7ecd ] 
    2016-05-04 08:18:16.883 INFO 9348 --- [   main] o.s.jdbc.datasource.init.ScriptUtils  : Executing SQL script from class path resource [org/springframework/batch/core/schema-hsqldb.sql] 
    2016-05-04 08:18:16.927 INFO 9348 --- [   main] o.s.jdbc.datasource.init.ScriptUtils  : Executed SQL script from class path resource [org/springframework/batch/core/schema-hsqldb.sql] in 20 ms. 
    2016-05-04 08:18:17.392 INFO 9348 --- [   main] o.s.j.e.a.AnnotationMBeanExporter  : Registering beans for JMX exposure on startup 
    2016-05-04 08:18:17.413 INFO 9348 --- [   main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0 
    2016-05-04 08:18:17.737 INFO 9348 --- [   main] o.s.b.a.b.JobLauncherCommandLineRunner : Running default command line with: [] 
    2016-05-04 08:18:17.766 INFO 9348 --- [   main] o.s.b.c.r.s.JobRepositoryFactoryBean  : No database type set, using meta data indicating: HSQL 
    2016-05-04 08:18:18.032 INFO 9348 --- [   main] o.s.b.c.l.support.SimpleJobLauncher  : No TaskExecutor has been set, defaulting to synchronous executor. 
    2016-05-04 08:18:18.147 INFO 9348 --- [   main] o.s.b.c.l.support.SimpleJobLauncher  : Job: [FlowJob: [name=notifyUserJob]] launched with the following parameters: [{run.id=1}] 
    2016-05-04 08:18:18.187 INFO 9348 --- [   main] o.s.batch.core.job.SimpleStepHandler  : Executing step: [step1] 
    2016-05-04 08:18:22.044 INFO 9348 --- [pool-2-thread-1] o.s.b.c.l.support.SimpleJobLauncher  : Job: [FlowJob: [name=anomalyJob]] launched with the following parameters: [{}] 
    2016-05-04 08:18:22.079 INFO 9348 --- [pool-2-thread-1] o.s.batch.core.job.SimpleStepHandler  : Executing step: [step2] 
    2016-05-04 08:18:28.990 INFO 9348 --- [ taskExecutor-1] com.electra.service.ElectraServiceImpl : date Mon May 02 08:18:12 IST 1 
    2016-05-04 08:18:28.991 INFO 9348 --- [ taskExecutor-1] c.e.repository.UnitsRepositoryImpl  : push new unit 
    2016-05-04 08:18:32.581 INFO 9348 --- [ taskExecutor-1] com.electra.service.ElectraServiceImpl : date Wed May 04 08:18:32 IST 2016 
    2016-05-04 08:18:32.581 INFO 9348 --- [ taskExecutor-1] c.e.repository.UnitsRepositoryImpl  : push new unit 
    2016-05-04 08:19:16.876 INFO 9348 --- [   main] o.s.b.c.l.support.SimpleJobLauncher  : Job: [FlowJob: [name=notifyUserJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED] 
    2016-05-04 08:19:16.999 INFO 9348 --- [   main] o.s.b.c.l.support.SimpleJobLauncher  : Job: [FlowJob: [name=anomalyJob]] launched with the following parameters: [{run.id=1}] 
    2016-05-04 08:19:17.053 INFO 9348 --- [   main] o.s.batch.core.job.SimpleStepHandler  : Executing step: [step2] 
    2016-05-04 08:19:17.491 INFO 9348 --- [pool-2-thread-1] o.s.b.c.l.support.SimpleJobLauncher  : Job: [FlowJob: [name=anomalyJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] 
    2016-05-04 08:19:52.399 INFO 9348 --- [   main] o.s.b.c.l.support.SimpleJobLauncher  : Job: [FlowJob: [name=anomalyJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED] 
    2016-05-04 08:19:52.401 INFO 9348 --- [   main] com.electra.Application     : Started Application in 133.639 seconds (JVM running for 134.724) 
    2016-05-04 08:20:21.066 INFO 9348 --- [pool-2-thread-1] o.s.b.c.l.support.SimpleJobLauncher  : Job: [FlowJob: [name=anomalyJob]] launched with the following parameters: [{}] 
    2016-05-04 08:20:21.288 INFO 9348 --- [pool-2-thread-1] o.s.batch.core.job.SimpleStepHandler  : Executing step: [step2] 
    2016-05-04 08:20:31.103 INFO 9348 --- [pool-2-thread-1] o.s.b.c.l.support.SimpleJobLauncher  : Job: [FlowJob: [name=anomalyJob]] completed with the following parameters: [{}] and the following status: [COMPLETED] 

请告诉我我做错了什么,以及为什么监听程序没有被执行以便后续尝试。

+0

您是否已将'@ EnableScheduling'添加到您的配置中? – joshiste

+0

我在我的BatchConfiguration类的基础上添加了这个注解,但没有什么区别。 My Listener仍然只在第一次被调用。 –

+0

所以这项工作运行?你的监听器构造函数中只有代码。但不是在beforeJob()afterJob()或任何其他方法...所以我猜测监听器被调用,但什么都不做... – joshiste

回答

0

您只有听众构造函数中有代码。但不是在beforeJob()afterJob()或任何其他方法...因此,我猜测监听器被调用,但什么也没有... 所以你需要重写适当的监听器方法。

+0

我将此代码添加到代码中,并且侦听器问题得到解决,但现在我已经开始获取此异常:'org.springframework.dao.ConcurrencyFailureException:PreparedStatementCallback; SQL [INSERT into BATCH_JOB_INSTANCE(JOB_INSTANCE_ID,JOB_NAME,JOB_KEY,VERSION)values(?,?,?,?)];事务回滚:序列化失败;嵌套异常是java.sql.SQLTransactionRollbackException:事务回滚:序列化失败' –

+0

好吧,明白了。在mvcc模式下使用hsql驱动程序并解决此异常。但是有一个小小的例子,我的工作经历了两次初始化,之后它开始按照预定的时间运行。如果可以,请帮助我。 –

+0

启动后设置'spring.batch.job.enabled = false'来禁止作业运行 – joshiste

相关问题