1

我正在努力做一个可运行的junit测试,用于回滚骆驼路由期间发生的操作。使用apache骆驼测试回滚事务

我有一个骆驼路线设置,在目录上侦听。它期望一个csv文件。当csv文件出现时,它会创建新的SearchAnalytics数据。它在csv文件的每行中向表中添加一个新行。

我提出的默认spring事务方法似乎不适用于骆驼路由上发生的操作。

下面的代码工作。但它会永久保存数据并且不会回滚插入。这意味着测试只会通过一次,除非我手动删除数据。

鉴于我的示例代码,我如何使其回滚事务?

我的路线是这样的

from("ftp://some__remote__ftp_dir_path") 
    .routeId("searchAnalyticsImport")    
    .choice() 
    .when(simple("${in.header.CamelFileName} contains '.csv'")) 
    .split().method("csvSplitter", "iterator").streaming() // reads the csv file returns data objects 
    .processRef("searchAnalyticsProcesser") // this some dao saves 
    .to(Queues.SOME_REQUEST) 
.end(); 

JUnit测试

@TransactionConfiguration(defaultRollback = true, transactionManager = "transactionManager") 
@RunWith(SpringJUnit4ClassRunner.class) 
@ContextConfiguration(locations = { TestAppConfig.class}) 
public class searchAnalyticsImportTest { 

    @EndpointInject(uri = "mock:sippmatcher.requestqueue?preserveMessageQos=true") 
    private MockEndpoint mockEndpointRequest; 

    @Before 
    public void setup() throws Exception { 

     camelContext.getRouteDefinition("searchAnalyticsImport").adviceWith(camelContext, new AdviceWithRouteBuilder() { 
      @Override 
      public void configure() throws Exception { 

       replaceFromWith("file://"+this.getClass().getResource("path to folder etc...")+"?noop=true"); 

       interceptSendToEndpoint(Queues.SOME_REQUEST) 
         .skipSendToOriginalEndpoint() 
         .to(mockEndpointRequest); 
      } 
     }); 
    } 

    @Test 
    public void simpleTest() throws Exception{ 

     // there are 2 results in the test csv file.. need to poll the results till it completes 
     PollWithTimeout.run("keep polling until route has been statisfied", 15000, new PollWithTimeout.Attempt() { 
      @Override 
      public boolean complete() { 
       Date dateTime1MinuteAgo = new DateTime().minusMinutes(1).toDate(); 

       Integer newSearchCount = searchAnalysiticDao.findBySearchStartedAfter(dateTime1MinuteAgo).size(); 

       System.out.println("Recently added count: " + newSearchCount); 
       return (newSearchCount == 2); 
      } 
     }); 

     mockEndpointRequest.expectedMessageCount(2); 
     mockEndpointRequest.assertIsSatisfied(); 
    } 
} 
+0

据我所见,你的路线不是交易型的。你有没有试图使其成交(http://camel.apache.org/transactional-client.html)? –

+0

我将如何修改我的路线以包含交易? –

+1

看到我以前的答案(骆驼文档)。应该像在路由定义开始处添加.transacted()一样简单。也许你必须提供你的交易管理者的名字以及.. –

回答

1

豆加入上下文(将增加javaconfig选项,将该)

如Andreas的评论部分所述,您可以添加.transacted到路由并确保您的事务管理器bean被注入到您的上下文文件中。

路线

from("ftp://some__remote__ftp_dir_path") 
    .routeId("searchAnalyticsImport") 
    .end() 
    .transacted("PROPAGATION_REQUIRED")    
    etc.... 

语境豆配置

<bean id="jmsTransactionManager" 
     class="org.springframework.jms.connection.JmsTransactionManager"> 
    <property name="connectionFactory" ref="pooledConnectionFactory" /> 
    <property name="defaultTimeout" value="30"/> 
</bean> 

<bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy"> 
    <property name="transactionManager" ref="jmsTransactionManager" /> 
    <property name="propagationBehaviorName" value="PROPAGATION_REQUIRED" /> 
</bean> 

或者增加交易DAO

您可以在searchAnalyticsProcesser中调用的dao方法使用以下注释。事务处理bean仍然是必需的,但你可以在注释中按名称指定它。

@Transactional(
    propagation = Propagation.REQUIRED, 
    readOnly = false, 
    value="transactionManager", 
    rollbackFor = { 
     Exception.class 
     }) 
public void insertStuff()