2013-05-29 49 views
0

我有一个路由设置为以批处理模式运行,轮询几千个XML文件。每个都在XML结构中进行时间戳,并使用此dateTime元素来确定XML是否应包含在批处理的进一步处理中(一种XQuery转换)。由于这是批量路由,它在执行后自行终止。基于XQuery筛选器设置标题

由于路由需要自行关闭,所以如果每条消息都被过滤掉,我必须确保它也关闭,这就是为什么我不使用过滤器,而是使用.choice()语句,并在交换机上设置自定义标头稍后在一个bean中使用,该bean将匹配并为XQuery准备单个源文档。

但是,我目前的做法需要第二条路线,即.choice()的两个分支转发到。这是必要的,因为我似乎无法迫使两条路径继续。所以我的问题是:如何摆脱这第二条路线?一种方法是将过滤器头部设置为一个bean,但我担心所涉及的开销。我认为Camel中的XQuery过滤器将大大超过POJO,它能够从字符串中构建XML文档并针对它运行XQuery。

from(sourcePath + "?noop=true" + "&include=.*.xml") 
     .choice() 
      .when() 
       .xquery("[XQuery Filter]") 
       .setHeader("Filtered", constant(false)) 
       .to("direct:continue") 
      .otherwise() 
       .setHeader("Filtered", constant(true)) 
       .to("direct:continue") 
.end(); 

from("direct:continue") 
     .routeId(forwarderRouteID) 
     .aggregate(aggregationExpression) 
      .completionFromBatchConsumer() 
      .completionTimeout(DEF_COMPLETION_TIMEOUT) 
      .groupExchanges() 
     .bean(new FastQueryMerger(), "group") 
     .to("xquery:" + xqueryPath) 
     .bean(new FileModifier(interval), "setFileName") 
     .to(targetPath) 
     .process(new Processor() { 
       @Override 
       public void process(Exchange exchange) throws Exception { 
        new RouteTerminator(routeID, exchange.getContext()).start(); 
        new RouteTerminator(forwarderRouteID, exchange.getContext()).start(); 
       } 
      }) 
.end(); 

回答

1

不会.end()帮助吗? 我指的是以下几点:

from(sourcePath + "?noop=true" + "&include=.*.xml") 
    .choice() 
     .when() 
      .xquery("[XQuery Filter]") 
      .setHeader("Filtered", constant(false)).end() 
     .otherwise() 
      .setHeader("Filtered", constant(true)).end() 
    .aggregate(aggregationExpression) 
     .completionFromBatchConsumer() 
     .completionTimeout(DEF_COMPLETION_TIMEOUT) 
     .groupExchanges() 
    .bean(new FastQueryMerger(), "group") 
    .to("xquery:" + xqueryPath) 
    .bean(new FileModifier(interval), "setFileName") 
    .to(targetPath) 
    .process(new Processor() { 
      @Override 
      public void process(Exchange exchange) throws Exception { 
       new RouteTerminator(routeID, exchange.getContext()).start(); 
       new RouteTerminator(forwarderRouteID, exchange.getContext()).start(); 
      } 
     }); 

只是快速地测试了以下一个和它的工作:

@Produce(uri = "direct:test") 
protected ProducerTemplate testProducer; 
@EndpointInject(uri = "mock:test-first") 
protected MockEndpoint testFirst; 
@EndpointInject(uri = "mock:test-therest") 
protected MockEndpoint testTheRest; 
@EndpointInject(uri = "mock:test-check") 
protected MockEndpoint testCheck; 

@Test 
public void test() { 
    final String first = "first"; 
    final String second = "second"; 
    testFirst.setExpectedMessageCount(1); 
    testTheRest.setExpectedMessageCount(1); 
    testCheck.setExpectedMessageCount(2); 
    testProducer.sendBody(first); 
    testProducer.sendBody(second); 
    try { 
     testFirst.assertIsSatisfied(); 
     testTheRest.assertIsSatisfied(); 
     testCheck.assertIsSatisfied(); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
} 

@Override 
protected RouteBuilder createRouteBuilder() { 
    return new RouteBuilder() { 
     public void configure() { 
      from("direct:test") 
       .choice() 
        .when(body().isEqualTo("first")).to("mock:test-first") 
        .otherwise().to("mock:test-therest").end() 
        .to("mock:test-check"); 
     } 
    }; 
} 
+0

我花了相当长一段时间,回到这个问题,因为我刚刚返回到该部分的项目。你给的第二个代码示例工作完美,正是我所需要的。整个'.choice()'语句末尾的简单'.end()'终止所有分支。你原来的第一段代码段有两个'.end()'语句,它们是不正确的,不能编译。我相应地修改了它。再次感谢。 – Lilienthal