2013-06-21 53 views
1

我有多个客户端将文件发送到服务器。对于一组数据,有两个文件包含有关该数据的信息,每个数据都具有相同的名称。当收到一个文件时,服务器会向我的队列发送一条消息,其中包含文件路径,文件名,客户端ID以及文件的“类型”(全部文件扩展名相同,但有两种类型, “给他们打电话A和B)。使用骆驼来聚合相同邮件头的邮件

一组数据的两个文件具有相同的文件名。只要服务器收到了两个文件,我就需要启动一个合并这两个文件的程序。目前,我有一些看起来像这样:

from("jms:queue.name").aggregate(header("CamelFileName")).completionSize(2).to("exec://FILEPATH?args="); 

我在哪里卡住是标题(“CamelFileName”),更具体如何汇聚的作品。

当completionSize设置为2时,它只是吸收所有消息并将它们存储在某个数据结构中,直到与第一条消息匹配的第二条消息通过?另外,header()是否需要一个特定的值?我有多个客户端,所以我正考虑在头文件中包含客户端ID和文件名,但是我又不知道是否必须给出具体的值。我也不知道我是否可以使用正则表达式。

任何想法或提示将是超级有用。 谢谢

编辑: 这里是我现在有一些代码。根据我对这个问题的描述以及对所选答案的评论看起来是否准确(除了我没有复制过的近括号)?

public static void main(String args[]) throws Exception{ 
     CamelContext c = new DefaultCamelContext(); 
     c.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false")); 
     //ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); 
     //c.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory)); 
     c.addRoutes(new RouteBuilder() { 
      public void configure() { 
       from("activemq:queue:analytics.camelqueue").aggregate(new MyAggregationStrategy()).header("subject").completionSize(2).to("activemq:queue:analytics.success"); 
      } 
     }); 
     c.start(); 
     while (true) { 
      System.out.println("Waiting on messages to come through for camel"); 
      Thread.sleep(2 * 1000); 
     } 
     //c.stop(); 
    } 

    private static class MyAggregationStrategy implements AggregationStrategy { 

     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 
      if (oldExchange == null) 
       return newExchange; 
      // and here is where combo stuff goes 
      String oldBody = oldExchange.getIn().getBody(String.class); 
      String newBody = newExchange.getIn().getBody(String.class); 
      boolean oldSet = oldBody.contains("set"); 
      boolean newSet = newBody.contains("set"); 
      boolean oldFlow = oldBody.contains("flow"); 
      boolean newFlow = newBody.contains("flow"); 
      if ((oldSet && newFlow) || (oldFlow && newSet)) { 
       //they match so return new exchange with info so extractor can be started with exec 
       String combined = oldBody + "\n" + newBody + "\n"; 
       newExchange.getIn().setBody(combined); 
       return newExchange; 
      } 
      else { 
       // no match so do something.... 
       return null; 
      } 
     } 
    } 

回答

3

你必须提供一个AggregationStrategy来定义你想怎么交易所合并...

,如果你只对文件名感兴趣,并接收正好2的交流,那么你可以使用UseLatestAggregationStrategy只通过最新的交易所一次2已'汇总'...

说,它听起来像你需要保留两个交易所(一个为每个clientId),所以你可以传递信息到'exec'步骤...如果是这样,您可以使用内置的聚合策略enab将交易所合并到GroupedExchange持有者中通过groupExchanges选项引导...或者指定一个自定义的AggregationStrategy来组合它们,但是您可以选择。只需要记住,你的“高管”一步需要处理您决定使用什么聚合结构...

看到这些单元测试的例子:

https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorTest.java

https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java

+0

所以AggregationStrategy用于以某种方式组合消息,然后将其提供给'exec'? “两个交易所”是什么意思?我刚刚开始使用Camel,所以对我来说一切都还是相当新的np,欢迎来到骆驼......而且Aggregator是相当复杂/强大的工具......简而言之,Exchange包装一条消息(来自你的队列等) ,所以如果你在等待2条消息(由fileName关联),你最终会得到2个交换组合在一起...然后你需要将相关数据从这些交换到exec(fileName,clientIDs, etc)... – thaweatherman

+0

虽然文件名可以从不同的客户端得到,但是我可以通过ID和文件名关联起来吗? – thaweatherman

+0

对不起,我错误地使用了你的用例...正确的,如果你想为每个客户端有不同的组合,你应该把聚合表达式作为fileName + clientID的组合。 –