2015-04-28 220 views
2

是否可以在单个应用程序中使用这两个启动器?spring-boot-starter-jta-atomikos和spring-boot-starter-batch

我想将CSV文件中的记录加载到数据库表中。 Spring Batch表存储在不同的数据库中,所以我假设我需要使用JTA来处理事务。

无论何时将@EnableBatchProcessing添加到我的@Configuration类中,它都会配置一个PlatformTransactionManager,这会阻止Atomikos自动配置它。

是否有任何弹簧引导+批+ jta样本显示如何做到这一点?

非常感谢, 詹姆斯

回答

1

我只是通过这个去了,我发现的东西,似乎工作。正如您所注意到的,@EnableBatchProcessing会导致创建DataSourceTransactionManager,这会混淆一切。我在@EnableBatchProcessing中使用了modular = true,所以ModularBatchConfiguration类被激活。

我所做的就是停止使用@EnableBatchProcessing,而是将整个ModularBatchConfiguration类复制到我的项目中。然后我注释掉了transactionManager()方法,因为Atomikos配置创建了JtaTransactionManager。我也不得不重写jobRepository()方法,因为它被硬编码为使用在DefaultBatchConfiguration内创建的DataSourceTransactionManager。我也不得不明确导入JtaAutoConfiguration类。这将电线正确连接(根据执行器的“豆”端点 - 感谢上帝)。但是当你运行它时,事务管理器会抛出一个异常,因为某处某处设置了明确的事务隔离级别。所以我也写了一个BeanPostProcesso r找到交易管理器并打电话给txnMgr.setAllowCustomIsolationLevels(true);

现在一切正常,但作业正在运行时,即使我能看到SQLYog中的数据,我也无法使用JdbcTemplate从batch_step_execution表中获取当前数据。这必须与事务隔离有关,但我还没有能够理解它。

这是我的配置类,从Spring复制并修改如上所述。 PS,我有我的DataSource指向批注表注释为@Primary的数据库。此外,我将我的DataSource豆类更改为org.apache.tomcat.jdbc.pool.XADataSource;我不确定是否有必要。

@Configuration 
@Import(ScopeConfiguration.class) 
public class ModularJtaBatchConfiguration implements ImportAware 
{ 
    @Autowired(required = false) 
    private Collection<DataSource> dataSources; 

    private BatchConfigurer configurer; 

    @Autowired 
    private ApplicationContext context; 

    @Autowired(required = false) 
    private Collection<BatchConfigurer> configurers; 

    private AutomaticJobRegistrar registrar = new AutomaticJobRegistrar(); 

    @Bean 
    public JobRepository jobRepository(DataSource batchDataSource, JtaTransactionManager jtaTransactionManager) throws Exception 
    { 
     JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean(); 
     factory.setDataSource(batchDataSource); 
     factory.setTransactionManager(jtaTransactionManager); 
     factory.afterPropertiesSet(); 
     return factory.getObject(); 
    } 

    @Bean 
    public JobLauncher jobLauncher() throws Exception { 
     return getConfigurer(configurers).getJobLauncher(); 
    } 

// @Bean 
// public PlatformTransactionManager transactionManager() throws Exception { 
//  return getConfigurer(configurers).getTransactionManager(); 
// } 

    @Bean 
    public JobExplorer jobExplorer() throws Exception { 
     return getConfigurer(configurers).getJobExplorer(); 
    } 

    @Bean 
    public AutomaticJobRegistrar jobRegistrar() throws Exception { 
     registrar.setJobLoader(new DefaultJobLoader(jobRegistry())); 
     for (ApplicationContextFactory factory : context.getBeansOfType(ApplicationContextFactory.class).values()) { 
      registrar.addApplicationContextFactory(factory); 
     } 
     return registrar; 
    } 

    @Bean 
    public JobBuilderFactory jobBuilders(JobRepository jobRepository) throws Exception { 
     return new JobBuilderFactory(jobRepository); 
    } 

    @Bean 
    // hopefully this will autowire the Atomikos JTA txn manager 
    public StepBuilderFactory stepBuilders(JobRepository jobRepository, JtaTransactionManager ptm) throws Exception { 
     return new StepBuilderFactory(jobRepository, ptm); 
    } 

    @Bean 
    public JobRegistry jobRegistry() throws Exception { 
     return new MapJobRegistry(); 
    } 

    @Override 
    public void setImportMetadata(AnnotationMetadata importMetadata) { 
     AnnotationAttributes enabled = AnnotationAttributes.fromMap(importMetadata.getAnnotationAttributes(
       EnableBatchProcessing.class.getName(), false)); 
     Assert.notNull(enabled, 
       "@EnableBatchProcessing is not present on importing class " + importMetadata.getClassName()); 
    } 

    protected BatchConfigurer getConfigurer(Collection<BatchConfigurer> configurers) throws Exception { 
     if (this.configurer != null) { 
      return this.configurer; 
     } 
     if (configurers == null || configurers.isEmpty()) { 
      if (dataSources == null || dataSources.isEmpty()) { 
       throw new UnsupportedOperationException("You are screwed"); 
      } else if(dataSources != null && dataSources.size() == 1) { 
       DataSource dataSource = dataSources.iterator().next(); 
       DefaultBatchConfigurer configurer = new DefaultBatchConfigurer(dataSource); 
       configurer.initialize(); 
       this.configurer = configurer; 
       return configurer; 
      } else { 
       throw new IllegalStateException("To use the default BatchConfigurer the context must contain no more than" + 
                 "one DataSource, found " + dataSources.size()); 
      } 
     } 
     if (configurers.size() > 1) { 
      throw new IllegalStateException(
        "To use a custom BatchConfigurer the context must contain precisely one, found " 
          + configurers.size()); 
     } 
     this.configurer = configurers.iterator().next(); 
     return this.configurer; 
    } 

} 

@Configuration 
class ScopeConfiguration { 

    private StepScope stepScope = new StepScope(); 

    private JobScope jobScope = new JobScope(); 

    @Bean 
    public StepScope stepScope() { 
     stepScope.setAutoProxy(false); 
     return stepScope; 
    } 

    @Bean 
    public JobScope jobScope() { 
     jobScope.setAutoProxy(false); 
     return jobScope; 
    } 

} 
+0

最后,即使这并没有为我工作。如果没有让Atomikos JTA Txn Mgr疯狂并锁定并杀死我所有的工作,我无法查询数据库。然后我意识到我的第二个数据源只读取一个作业,所以我将所有配置恢复为标准的非JTA配置,完全取出Atomikos,并创建了第二个只读数据源作为Tomcat DataSource池bean,其中autoCommit =只有在特定工作启动时才是真实的。 –

0

我找到了一个解决方案,我能够保持@EnableBatchProcessing但不得不实施BatchConfigurer和Atomikos公司豆类,看到我的完整的答案在这so answer