数据库写入未按预期回滚。 我花了很多时间阅读软件文档和网页张贴。 我一直无法解决问题。使用Spring,JOOQ,Postgres和ActiveMQ时不会回滚
我希望你们可以帮助我。
情景
- 我的应用程序从队列拉出一个消息,从所述
消息中提取数据,并将其写入到数据库。 - 向数据库写入的方法执行2次SQL插入。
- 第二次插入会得到一个异常:org.postgresql.util.PSQLException:错误:重复的键值违反了唯一约束“table2_PK”
- 但是,第一次插入仍然被提交到数据库。
相关软件
- 春季启动1.2.5.RELEASE
- Atomikos公司-UTIL 3.9.3(从弹簧引导起动JTA-Atomikos公司1.2.5.RELEASE)
- jooq 3.6.2
- 的PostgreSQL 9.4-1201-jdbc41
- ActiveMQ的客户端5.1.2
应用程序代码 - 我在下面粘贴了我的代码的相关部分。
- GdmServer - 我的 “服务器” 类,这也宣告的Spring bean 配置
- PortSIQueue - 我的JMS MessageListener类
- 内核 - 我的工人阶级,即写入数据库的代码,一个Spring bean由我的MessageListener调用
我很感谢任何人都可以提供的帮助。
感谢
package com.sm.gis.gdm;
import javax.transaction.SystemException;
import javax.transaction.UserTransaction;
import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.jooq.impl.DSL;
import org.postgresql.xa.PGXADataSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.jta.JtaTransactionManager;
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import com.atomikos.jms.AtomikosConnectionFactoryBean;
import com.sm.gis.config.GisConfig;
@SpringBootApplication
@EnableJms
@EnableTransactionManagement
public class GdmServer {
@Autowired
ConfigurableApplicationContext context;
@Autowired
GisConfig gisConfig;
/**
* Starts the GDM Server
*/
public static void main(String[] args) {
SpringApplication.run(GdmServer.class, args);
}
// -------------------------------------------------------------------------
// Spring bean configurations
// -------------------------------------------------------------------------
@Bean
GisConfig gisConfig() {
return new GisConfig();
}
@Bean
PlatformTransactionManager transactionManager() throws SystemException {
JtaTransactionManager manager = new JtaTransactionManager();
manager.setTransactionManager(atomikosUserTransactionManager());
manager.setUserTransaction (atomikosUserTransaction());
manager.setAllowCustomIsolationLevels(true);
return manager;
}
@Bean(initMethod = "init", destroyMethod = "close")
UserTransactionManager atomikosUserTransactionManager() throws SystemException {
UserTransactionManager manager = new UserTransactionManager();
manager.setStartupTransactionService(true);
manager.setForceShutdown(false);
manager.setTransactionTimeout(gisConfig.getTxnTimeout());
return manager;
}
@Bean
UserTransaction atomikosUserTransaction() {
return new UserTransactionImp();
}
@Bean(initMethod = "init", destroyMethod = "close")
AtomikosDataSourceBean atomikosJdbcConnectionFactory() {
PGXADataSource pgXADataSource = new PGXADataSource();
pgXADataSource.setUrl(gisConfig.getGdbUrl());
pgXADataSource.setUser(gisConfig.getGdbUser());
pgXADataSource.setPassword(gisConfig.getGdbPassword());
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(pgXADataSource);
xaDataSource.setUniqueResourceName("gdb");
xaDataSource.setPoolSize(gisConfig.getGdbPoolSize());
return xaDataSource;
}
@Bean
DSLContext dslContext() {
DSLContext dslContext = DSL.using(atomikosJdbcConnectionFactory(), SQLDialect.POSTGRES);
return dslContext;
}
@Bean(initMethod = "init", destroyMethod = "close")
AtomikosConnectionFactoryBean atomikosJmsConnectionFactory() {
ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory();
activeMQXAConnectionFactory.setBrokerURL(gisConfig.getMomBrokerUrl());
AtomikosConnectionFactoryBean atomikosConnectionFactoryBean = new AtomikosConnectionFactoryBean();
atomikosConnectionFactoryBean.setUniqueResourceName("activeMQBroker");
atomikosConnectionFactoryBean.setXaConnectionFactory(activeMQXAConnectionFactory);
atomikosConnectionFactoryBean.setLocalTransactionMode(false);
return atomikosConnectionFactoryBean;
}
@Bean
DefaultMessageListenerContainer queueWrapperGDM() throws SystemException {
DefaultMessageListenerContainer messageSource = new DefaultMessageListenerContainer();
messageSource.setTransactionManager(transactionManager());
messageSource.setConnectionFactory(atomikosJmsConnectionFactory());
messageSource.setSessionTransacted(true);
messageSource.setConcurrentConsumers(1);
messageSource.setReceiveTimeout(gisConfig.getMomQueueGdmTimeoutReceive());
messageSource.setDestinationName(gisConfig.getMomQueueGdmName());
messageSource.setMessageListener(context.getBean("portSIQueue"));
return messageSource;
}
@Bean
JmsTemplate queueWrapperLIMS() {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(atomikosJmsConnectionFactory());
jmsTemplate.setDefaultDestinationName(gisConfig.getMomQueueLimsName());
jmsTemplate.setSessionTransacted(true);
return jmsTemplate;
}
}
package com.sm.gis.gdm.ports;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import com.sm.gis.gdm.kernel.Kernel;
import com.sm.gis.sdo.xml.marshaler.GisMessageMarshaler;
import com.sm.gis.sdo.xml.service.message.CreateGenomicTestOrderInGIS;
@Component
public class PortSIQueue implements MessageListener {
@Autowired
ConfigurableApplicationContext context;
@Autowired
GisMessageMarshaler queueMessageMashaler;
@Autowired
Kernel kernel;
@Override
@Transactional(rollbackFor = {Throwable.class})
public void onMessage(Message jmsMessage) {
TextMessage jmsTextMessage = (TextMessage) jmsMessage;
// Extract JMS message body...
String jmsPayload = "";
try {
jmsPayload = jmsTextMessage.getText();
} catch (JMSException e) {
throw new RuntimeException(e);
}
// Marshal XML text to object...
Object gisMessage = queueMessageMashaler.toObject(jmsPayload);
kernel.receiveCreateGenomicTestOrderInGIS((CreateGenomicTestOrderInGIS) gisMessage);
}
}
package com.sm.gis.gdm.kernel;
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
@Component
public class Kernel {
@Autowired
ConfigurableApplicationContext context;
@Autowired
DSLContext dslContext;
<snip>
public void receiveCreateGenomicTestOrderInGIS(CreateGenomicTestOrderInGIS message) {
dslContext.insertInto(table1)
.set(...)
.set(...)
.execute();
dslContext.insertInto(table2)
.set(...)
.set(...)
.execute();
}
<snip>
}
为什么你努力不使用Spring启动? Spring Boot 1.2具有JTA支持和开箱即用的Atomikos自动检测功能。你正在努力不要使用和规避Spring Boot的工作,而不是反对它。 –
[有关记录,这也正在讨论jOOQ用户组](https://groups.google.com/forum/#!topic/jooq-user/geYotcwEcRM) –
我不想绕过Sprint Boot。我相信我遵循Spring文档和Atomkos,JOOQ和ActiveMQ网站上的相关材料所述的使用Spring Boot 1.2和Atomikos的说明。这就是说,我并不是想为任何人或我创造麻烦。 :)也许我误解了一些东西。这就是我发布这个问题的原因。 –