2015-09-26 91 views
1

数据库写入未按预期回滚。 我花了很多时间阅读软件文档和网页张贴。 我一直无法解决问题。使用Spring,JOOQ,Postgres和ActiveMQ时不会回滚

我希望你们可以帮助我。

情景

  • 我的应用程序从队列拉出一个消息,从所述
    消息中提取数据,并将其写入到数据库。
  • 向数据库写入的方法执行2次SQL插入。
  • 第二次插入会得到一个异常:org.postgresql.util.PSQLException:错误:重复的键值违反了唯一约束“table2_PK”
  • 但是,第一次插入仍然被提交到数据库。

相关软件

  1. 春季启动1.2.5.RELEASE
  2. Atomikos公司-UTIL 3.9.3(从弹簧引导起动JTA-Atomikos公司1.2.5.RELEASE)
  3. jooq 3.6.2
  4. 的PostgreSQL 9.4-1201-jdbc41
  5. ActiveMQ的客户端5.1.2

应用程序代码 - 我在下面粘贴了我的代码的相关部分。

  1. GdmServer - 我的 “服务器” 类,这也宣告的Spring bean 配置
  2. PortSIQueue - 我的JMS MessageListener类
  3. 内核 - 我的工人阶级,即写入数据库的代码,一个Spring bean由我的MessageListener调用

我很感谢任何人都可以提供的帮助。

感谢


package com.sm.gis.gdm; 

import javax.transaction.SystemException; 
import javax.transaction.UserTransaction; 

import org.apache.activemq.ActiveMQXAConnectionFactory; 
import org.jooq.DSLContext; 
import org.jooq.SQLDialect; 
import org.jooq.impl.DSL; 
import org.postgresql.xa.PGXADataSource; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean; 
import org.springframework.context.ConfigurableApplicationContext; 
import org.springframework.context.annotation.Bean; 
import org.springframework.jms.annotation.EnableJms; 
import org.springframework.jms.core.JmsTemplate; 
import org.springframework.jms.listener.DefaultMessageListenerContainer; 
import org.springframework.transaction.PlatformTransactionManager; 
import org.springframework.transaction.annotation.EnableTransactionManagement; 
import org.springframework.transaction.jta.JtaTransactionManager; 

import com.atomikos.icatch.jta.UserTransactionImp; 
import com.atomikos.icatch.jta.UserTransactionManager; 
import com.atomikos.jms.AtomikosConnectionFactoryBean; 
import com.sm.gis.config.GisConfig; 

@SpringBootApplication 
@EnableJms 
@EnableTransactionManagement 
public class GdmServer { 

    @Autowired 
    ConfigurableApplicationContext context; 
    @Autowired 
    GisConfig      gisConfig; 

    /** 
    * Starts the GDM Server 
    */ 
    public static void main(String[] args) { 
     SpringApplication.run(GdmServer.class, args); 
    } 

    // ------------------------------------------------------------------------- 
    // Spring bean configurations 
    // ------------------------------------------------------------------------- 

    @Bean 
    GisConfig gisConfig() { 
     return new GisConfig(); 
    } 

    @Bean 
    PlatformTransactionManager transactionManager() throws SystemException { 
     JtaTransactionManager manager = new JtaTransactionManager(); 
     manager.setTransactionManager(atomikosUserTransactionManager()); 
     manager.setUserTransaction (atomikosUserTransaction()); 
     manager.setAllowCustomIsolationLevels(true); 
     return manager; 
    } 

    @Bean(initMethod = "init", destroyMethod = "close") 
    UserTransactionManager atomikosUserTransactionManager() throws SystemException { 
     UserTransactionManager manager = new UserTransactionManager(); 
     manager.setStartupTransactionService(true); 
     manager.setForceShutdown(false); 
     manager.setTransactionTimeout(gisConfig.getTxnTimeout()); 
     return manager; 
    } 

    @Bean 
    UserTransaction atomikosUserTransaction() { 
     return new UserTransactionImp(); 
    } 

    @Bean(initMethod = "init", destroyMethod = "close") 
    AtomikosDataSourceBean atomikosJdbcConnectionFactory() { 
     PGXADataSource pgXADataSource = new PGXADataSource(); 
     pgXADataSource.setUrl(gisConfig.getGdbUrl()); 
     pgXADataSource.setUser(gisConfig.getGdbUser()); 
     pgXADataSource.setPassword(gisConfig.getGdbPassword()); 

     AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean(); 
     xaDataSource.setXaDataSource(pgXADataSource); 
     xaDataSource.setUniqueResourceName("gdb"); 
     xaDataSource.setPoolSize(gisConfig.getGdbPoolSize()); 
     return xaDataSource; 
    } 

    @Bean 
    DSLContext dslContext() { 
     DSLContext dslContext = DSL.using(atomikosJdbcConnectionFactory(), SQLDialect.POSTGRES); 
     return dslContext; 
    } 

    @Bean(initMethod = "init", destroyMethod = "close") 
    AtomikosConnectionFactoryBean atomikosJmsConnectionFactory() { 
     ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory(); 
     activeMQXAConnectionFactory.setBrokerURL(gisConfig.getMomBrokerUrl()); 

     AtomikosConnectionFactoryBean atomikosConnectionFactoryBean = new AtomikosConnectionFactoryBean(); 
     atomikosConnectionFactoryBean.setUniqueResourceName("activeMQBroker"); 
     atomikosConnectionFactoryBean.setXaConnectionFactory(activeMQXAConnectionFactory); 
     atomikosConnectionFactoryBean.setLocalTransactionMode(false); 
     return atomikosConnectionFactoryBean; 
    } 

    @Bean 
    DefaultMessageListenerContainer queueWrapperGDM() throws SystemException { 
     DefaultMessageListenerContainer messageSource = new DefaultMessageListenerContainer(); 
     messageSource.setTransactionManager(transactionManager()); 
     messageSource.setConnectionFactory(atomikosJmsConnectionFactory()); 
     messageSource.setSessionTransacted(true); 
     messageSource.setConcurrentConsumers(1); 
     messageSource.setReceiveTimeout(gisConfig.getMomQueueGdmTimeoutReceive()); 
     messageSource.setDestinationName(gisConfig.getMomQueueGdmName()); 
     messageSource.setMessageListener(context.getBean("portSIQueue")); 
     return messageSource; 
    } 

    @Bean 
    JmsTemplate queueWrapperLIMS() { 
     JmsTemplate jmsTemplate = new JmsTemplate(); 
     jmsTemplate.setConnectionFactory(atomikosJmsConnectionFactory()); 
     jmsTemplate.setDefaultDestinationName(gisConfig.getMomQueueLimsName()); 
     jmsTemplate.setSessionTransacted(true); 
     return jmsTemplate; 
    } 

} 

package com.sm.gis.gdm.ports; 

import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageListener; 
import javax.jms.TextMessage; 

import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.context.ConfigurableApplicationContext; 
import org.springframework.stereotype.Component; 
import org.springframework.transaction.annotation.Transactional; 

import com.sm.gis.gdm.kernel.Kernel; 
import com.sm.gis.sdo.xml.marshaler.GisMessageMarshaler; 
import com.sm.gis.sdo.xml.service.message.CreateGenomicTestOrderInGIS; 

@Component 
public class PortSIQueue implements MessageListener { 

    @Autowired 
    ConfigurableApplicationContext context; 
    @Autowired 
    GisMessageMarshaler    queueMessageMashaler; 
    @Autowired 
    Kernel       kernel; 

    @Override 
    @Transactional(rollbackFor = {Throwable.class}) 
    public void onMessage(Message jmsMessage) { 

     TextMessage jmsTextMessage = (TextMessage) jmsMessage; 

     // Extract JMS message body... 
     String jmsPayload = ""; 
     try { 
      jmsPayload = jmsTextMessage.getText(); 
     } catch (JMSException e) { 
      throw new RuntimeException(e); 
     } 

     // Marshal XML text to object... 
     Object gisMessage = queueMessageMashaler.toObject(jmsPayload); 

     kernel.receiveCreateGenomicTestOrderInGIS((CreateGenomicTestOrderInGIS) gisMessage); 
    } 

} 

package com.sm.gis.gdm.kernel; 

import org.jooq.DSLContext; 
import org.jooq.impl.DSL; 

@Component 
public class Kernel { 

    @Autowired 
    ConfigurableApplicationContext context; 
    @Autowired 
    DSLContext      dslContext; 

<snip> 
    public void receiveCreateGenomicTestOrderInGIS(CreateGenomicTestOrderInGIS message) { 

      dslContext.insertInto(table1) 
       .set(...) 
       .set(...) 
      .execute(); 

      dslContext.insertInto(table2) 
       .set(...) 
       .set(...) 
      .execute(); 
    } 
<snip> 
} 

+0

为什么你努力不使用Spring启动? Spring Boot 1.2具有JTA支持和开箱即用的Atomikos自动检测功能。你正在努力不要使用和规避Spring Boot的工作,而不是反对它。 –

+0

[有关记录,这也正在讨论jOOQ用户组](https://groups.google.com/forum/#!topic/jooq-user/geYotcwEcRM) –

+0

我不想绕过Sprint Boot。我相信我遵循Spring文档和Atomkos,JOOQ和ActiveMQ网站上的相关材料所述的使用Spring Boot 1.2和Atomikos的说明。这就是说,我并不是想为任何人或我创造麻烦。 :)也许我误解了一些东西。这就是我发布这个问题的原因。 –

回答

0

一直在使用的事务性注释类似的问题。必须在try/catch中使用(begin..commit)/ rollback来显式处理事务。不是很优雅和重复性,但工作。 TransactionContext保存在当前线程中。所以你的begin方法不需要返回ctx对象。 TransactionContext可以使用你的DSLContext.configuration()来实例化。

公用类DataSourceTransactionProvider实现TransactionProvider私有最终DataSourceTransactionManager txMgr;

@Inject 
public DataSourceTransactionProvider(DataSourceTransactionManager transactionManager) { 
    this.txMgr = transactionManager; 
} 

@Override 
public void begin(TransactionContext ctx) throws DataAccessException { 
    TransactionStatus transactionStatus = txMgr.getTransaction(new DefaultTransactionDefinition(TransactionDefinition.PROPAGATION_NESTED)); 
    ctx.transaction(new DBTransaction(transactionStatus)); 
} 

@Override 
public void commit(TransactionContext ctx) throws DataAccessException { 
    txMgr.commit(((DBTransaction) ctx.transaction()).transactionStatus); 
} 

@Override 
public void rollback(TransactionContext ctx) throws DataAccessException { 
    txMgr.rollback(((DBTransaction) ctx.transaction()).transactionStatus); 
} 

}

1

我是个白痴。 原来的问题是由于我的应用程序逻辑中存在缺陷。 如果第一次处理消息的尝试失败且发生异常,则ActiveMQ组件会重试消息。为第一次尝试创建的事务正确回滚。这是第二次成功的尝试。重试成功,因为数据库序列号在第一次尝试期间由应用程序逻辑递增,而第二次尝试不会导致重复密钥违规。在纠正了应用程序逻辑缺陷之后,由于在我的应用程序中,无论如何都没有消息可重试,我也关闭了重试。 我很抱歉浪费阅读我帖子的人的时间。

一路走来,我确实对实现进行了一些更改。这些更改使某些默认值显式选择。我将这些修改留下了,因为我相信他们会让我的团队中的其他开发人员更容易理解发生更快的事情。我还留下了JOOQ例外翻译代码,因为在其他情况下需要使用JOOQ例外翻译代码,并且无论如何似乎都是最佳做法。

我已将修改过的代码包含在本文中,以防其他人可能觉得它有用。


package com.sm.gis.gdm; 

import javax.transaction.SystemException; 
import javax.transaction.UserTransaction; 

import org.apache.activemq.ActiveMQXAConnectionFactory; 
import org.apache.activemq.RedeliveryPolicy; 
import org.jooq.DSLContext; 
import org.jooq.SQLDialect; 
import org.jooq.impl.DefaultConfiguration; 
import org.jooq.impl.DefaultDSLContext; 
import org.jooq.impl.DefaultExecuteListenerProvider; 
import org.postgresql.xa.PGXADataSource; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean; 
import org.springframework.context.ConfigurableApplicationContext; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.DependsOn; 
import org.springframework.jms.annotation.EnableJms; 
import org.springframework.jms.core.JmsTemplate; 
import org.springframework.jms.listener.DefaultMessageListenerContainer; 
import org.springframework.transaction.PlatformTransactionManager; 
import org.springframework.transaction.annotation.EnableTransactionManagement; 
import org.springframework.transaction.jta.JtaTransactionManager; 

import com.atomikos.icatch.jta.UserTransactionImp; 
import com.atomikos.icatch.jta.UserTransactionManager; 
import com.atomikos.jms.AtomikosConnectionFactoryBean; 
import com.sm.gis.config.GisConfig; 

@SpringBootApplication 
@EnableJms 
@EnableTransactionManagement(proxyTargetClass=true) 
public class GdmServer { 

    @Autowired 
    ConfigurableApplicationContext context; 
    @Autowired 
    GisConfig      gisConfig; 

    /** 
    * Starts the GDM Server 
    */ 
    public static void main(String[] args) { 
     SpringApplication.run(GdmServer.class, args); 
    } 

    // ------------------------------------------------------------------------- 
    // Spring bean configurations 
    // ------------------------------------------------------------------------- 

    @Bean 
    GisConfig gisConfig() { 
     return new GisConfig(); 
    } 

    @Bean 
    @DependsOn({ "atomikosUserTransactionManager", "atomikosUserTransaction", "atomikosJdbcConnectionFactory", "atomikosJmsConnectionFactory" }) 
    PlatformTransactionManager transactionManager() throws SystemException { 
     JtaTransactionManager manager = new JtaTransactionManager(); 
     manager.setTransactionManager(atomikosUserTransactionManager()); 
     manager.setUserTransaction(atomikosUserTransaction()); 
     manager.setAllowCustomIsolationLevels(true); 
     manager.afterPropertiesSet(); 
     return manager; 
    } 

    @Bean(initMethod = "init", destroyMethod = "close") 
    UserTransactionManager atomikosUserTransactionManager() throws SystemException { 
     UserTransactionManager manager = new UserTransactionManager(); 
     manager.setStartupTransactionService(true); 
     manager.setForceShutdown(false); 
     manager.setTransactionTimeout(gisConfig.getTxnTimeout()); 
     return manager; 
    } 

    @Bean 
    UserTransaction atomikosUserTransaction() { 
     return new UserTransactionImp(); 
    } 

    @Bean(initMethod = "init", destroyMethod = "close") 
    AtomikosDataSourceBean atomikosJdbcConnectionFactory() throws Exception { 
     PGXADataSource pgXADataSource = new PGXADataSource(); 
     pgXADataSource.setUrl(gisConfig.getGdbUrl()); 
     pgXADataSource.setUser(gisConfig.getGdbUser()); 
     pgXADataSource.setPassword(gisConfig.getGdbPassword()); 

     AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean(); 
     xaDataSource.setXaDataSource(pgXADataSource); 
     xaDataSource.setUniqueResourceName("gdb"); 
     xaDataSource.setPoolSize(gisConfig.getGdbPoolSize()); 
     xaDataSource.setTestQuery("SELECT 1"); 
     xaDataSource.afterPropertiesSet(); 
     return xaDataSource; 
    } 

    @Bean 
    @DependsOn({ "atomikosJdbcConnectionFactory" }) 
    DSLContext dslContext() throws Exception { 
     DefaultConfiguration jooqConfiguration = new DefaultConfiguration(); 
     jooqConfiguration.set(SQLDialect.POSTGRES_9_4); 
     jooqConfiguration.set(atomikosJdbcConnectionFactory()); 
     jooqConfiguration.set(new DefaultExecuteListenerProvider(new JooqToSpringExceptionTransformer())); 
     DSLContext dslContext = new DefaultDSLContext(jooqConfiguration); 
     return dslContext; 
    } 


    @Bean(initMethod = "init", destroyMethod = "close") 
    AtomikosConnectionFactoryBean atomikosJmsConnectionFactory() { 
     RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); 
     redeliveryPolicy.setInitialRedeliveryDelay(0); 
     redeliveryPolicy.setRedeliveryDelay(0); 
     redeliveryPolicy.setUseExponentialBackOff(false); 
     redeliveryPolicy.setMaximumRedeliveries(0); 

     ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory(); 
     activeMQXAConnectionFactory.setBrokerURL(gisConfig.getMomBrokerUrl()); 
     activeMQXAConnectionFactory.setRedeliveryPolicy(redeliveryPolicy); 

     AtomikosConnectionFactoryBean atomikosConnectionFactoryBean = new AtomikosConnectionFactoryBean(); 
     atomikosConnectionFactoryBean.setUniqueResourceName("activeMQBroker"); 
     atomikosConnectionFactoryBean.setXaConnectionFactory(activeMQXAConnectionFactory); 
     atomikosConnectionFactoryBean.setLocalTransactionMode(false); 
     return atomikosConnectionFactoryBean; 
    } 

    @Bean 
    @DependsOn({ "transactionManager" }) 
    DefaultMessageListenerContainer queueWrapperGDM() throws SystemException { 
     DefaultMessageListenerContainer messageSource = new DefaultMessageListenerContainer(); 
     messageSource.setTransactionManager(transactionManager()); 
     messageSource.setConnectionFactory(atomikosJmsConnectionFactory()); 
     messageSource.setSessionTransacted(true); 
     messageSource.setSessionAcknowledgeMode(0); 
     messageSource.setConcurrentConsumers(1); 
     messageSource.setReceiveTimeout(gisConfig.getMomQueueGdmTimeoutReceive()); 
     messageSource.setDestinationName(gisConfig.getMomQueueGdmName()); 
     messageSource.setMessageListener(context.getBean("portSIQueue")); 
     messageSource.afterPropertiesSet(); 
     return messageSource; 
    } 

    @Bean 
    @DependsOn({ "transactionManager" }) 
    JmsTemplate queueWrapperLIMS() { 
     JmsTemplate jmsTemplate = new JmsTemplate(); 
     jmsTemplate.setConnectionFactory(atomikosJmsConnectionFactory()); 
     jmsTemplate.setDefaultDestinationName(gisConfig.getMomQueueLimsName()); 
     jmsTemplate.setSessionTransacted(true); 
     jmsTemplate.setSessionAcknowledgeMode(0); 
     return jmsTemplate; 
    } 

} 

package com.sm.gis.gdm.ports; 

import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageListener; 
import javax.jms.TextMessage; 

import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.context.ConfigurableApplicationContext; 
import org.springframework.stereotype.Component; 
import org.springframework.transaction.annotation.Propagation; 
import org.springframework.transaction.annotation.Transactional; 

import com.sm.gis.gdm.kernel.Kernel; 
import com.sm.gis.sdo.xml.marshaler.GisMessageMarshaler; 
import com.sm.gis.sdo.xml.service.message.CreateGenomicTestOrderInGIS; 

@Component 
public class PortSIQueue implements MessageListener { 

    @Autowired 
    ConfigurableApplicationContext context; 
    @Autowired 
    GisMessageMarshaler    queueMessageMashaler; 
    @Autowired 
    Kernel       kernel; 

    @Override 
    @Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = {Throwable.class}) 
    public void onMessage(Message jmsMessage) { 

     TextMessage jmsTextMessage = (TextMessage) jmsMessage; 

     // Extract JMS message body... 
     String jmsPayload = ""; 
     try { 
      jmsPayload = jmsTextMessage.getText(); 
     } catch (JMSException e) { 
      throw new RuntimeException(e); 
     } 

     // Marshal XML text to object... 
     Object gisMessage = queueMessageMashaler.toObject(jmsPayload); 

     kernel.receiveCreateGenomicTestOrderInGIS((CreateGenomicTestOrderInGIS) gisMessage); 

} 

package com.sm.gis.gdm.kernel; 

import org.jooq.DSLContext; 

@Component 
public class Kernel { 

    @Autowired 
    ConfigurableApplicationContext context; 
    @Autowired 
    DSLContext      dslContext; 

<snip> 
    public void receiveCreateGenomicTestOrderInGIS(CreateGenomicTestOrderInGIS message) { 

     dslContext.insertInto(table1) 
      .set(...) 
      .set(...) 
     .execute(); 

     dslContext.insertInto(table2) 
      .set(...) 
      .set(...) 
     .execute(); 
    } 
<snip> 
}