2014-10-31 75 views
3

我想使用简单的处理器/异步处理器作为目的地并行处理队列中的消息。处理器每条消息需要一点时间,但每条消息都可以单独处理,因此同时处理(在健康边界内)。与ActiveMQ并行处理多条消息

我很难找到例子,特别是关于骆驼路线的xml配置。

到目前为止,我已经定义了一个线程池,路线和处理器:

<threadPool id="smallPool" threadName="MyProcessorThread" poolSize="5" maxPoolSize="50" maxQueueSize="100"/> 
<route> 
    <from uri="broker:queue:inbox" /> 
    <threads executorServiceRef="smallPool"> 
     <to uri="MyProcessor" /> 
    </threads> 
</route> 
<bean id="MyProcessor" class="com.example.java.MyProcessor" /> 

和我的处理器的样子:

public class MyProcessor implements Processor { 
    @Override 
    public void process(Exchange exchange) throws Exception { 
     Message in = exchange.getIn(); 
     String msg = in.getBody(String.class);  
     System.out.println(msg); 
     try { 
      Thread.sleep(10 * 1000); // Do something in the background 
     } catch (InterruptedException e) {} 
     System.out.println("Done!"); 
    } 
} 

不幸的是,当我发帖的队列,他们是仍然一一处理,每个延迟10秒(我的“后台任务”)。

任何人都可以指向正确的方向使用定义的线程池处理消息或解释我做错了什么?

+2

你试过concurrentConsumers?在你的例子中,这将是broker:queue:inbox?concurrentConsumers = 5(或其他)。我不记得每个消费者线程是否会使用它自己的处理器实例,但除非您在处理器中启动一个新线程,否则您将需要多个线程,因为在再次调用“from”之前,线路必须完成。 – 2014-10-31 19:04:28

+0

你看过骆驼负载平衡器吗? http://camel.apache.org/load-balancer.html – Fortyrunner 2014-10-31 21:15:17

+0

@Fortyrunner我认为负载平衡器将用于出站或端点。在这种情况下,并发消费者会做到这一点。 – 2014-11-01 06:43:59

回答

5

您应该使用concurrentConsumers选项在评论中说,

<route> 
    <from uri="broker:queue:inbox?concurrentConsumers=5" /> 
    <to uri="MyProcessor" /> 
</route> 

通知也有maxConcurrentConsumers您可以设置为使用最小/最大范围的并发消费者的,所以骆驼会自动生长/取决于收缩负载。

看到JMS文档的详细信息在

+0

有没有办法在运行时动态更改'concurrentConsumers'属性?骆驼提供这种能力?一种增加或减少运行时消费者数量的方法。也许通过JMX或HawtIO mgtm控制台。 – Tuelho 2015-11-25 18:01:04

+0

我们有什么方法可以动态改变'concurrentConcumers'吗? @Tuelho你有解决方案吗? – Kuldeep 2016-08-05 10:00:00

+1

您可以使用JMX /从hawtio Web控制台更改它 – 2016-08-07 12:33:29