2016-09-09 64 views
1

我使用Spring 4.x的DefaultJmsListenerContainerFactory来连接到ActiveMQ队列,使用@JmsListener处理来自该队列的消息,然后将消息推送到同一ActiveMQ代理上的主题。DefaultJmsListenerContainerFactory和并发连接不关闭

我为消费者/监听者和生产者使用单个缓存连接工厂,并且我将缓存使用者设置为false,这样我就可以缓存生产者,而不是使用者。我还将并发性设置为1-3,我预计在应用程序启动时队列中将有至少1个消费者,并且随着消息增加,消费者数量将达到3.但是,消息越来越少,我预计消费者的数量也会回落到1。但是,如果我看看线程(defaultmessagelistenercontainer-2/3),它们处于等待状态,并且它们不关闭。当负载消退时,预计消费者的数量是否也会关闭,这不是预期的行为吗?请参阅下面的配置,让我知道这种行为是不是开箱即用,并且如果我需要添加一些内容,以便按照上面的说明进行操作。

ApplicationContext.java

@Bean 
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws Throwable { 
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); 

    factory.setConnectionFactory(connectionFactory()); 
    factory.setConcurrency(environment.getProperty("jms.connections.concurrent")); 
    factory.setSessionTransacted(environment.getProperty("jms.connections.transacted", Boolean.class)); 
    return factory; 
} 

@Bean 
public CachingConnectionFactory connectionFactory(){ 
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); 
    redeliveryPolicy.setInitialRedeliveryDelay(environment.getProperty("jms.redelivery.initial-delay", Long.class)); 
    redeliveryPolicy.setRedeliveryDelay(environment.getProperty("jms.redelivery.delay", Long.class)); 
    redeliveryPolicy.setMaximumRedeliveries(environment.getProperty("jms.redelivery.maximum", Integer.class)); 
    redeliveryPolicy.setUseExponentialBackOff(environment.getProperty("jms.redelivery.use-exponential-back-off", Boolean.class)); 
    redeliveryPolicy.setBackOffMultiplier(environment.getProperty("jms.redelivery.back-off-multiplier", Double.class)); 

    ActiveMQConnectionFactory activeMQ = new ActiveMQConnectionFactory(environment.getProperty("jms.queue.username"), environment.getProperty("jms.queue.password"), environment.getProperty("jms.broker.endpoint")); 
    activeMQ.setRedeliveryPolicy(redeliveryPolicy); 
    activeMQ.setPrefetchPolicy(prefetchPolicy()); 

    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(activeMQ); 
    cachingConnectionFactory.setCacheConsumers(environment.getProperty("jms.connections.cache.consumers", Boolean.class)); 
    cachingConnectionFactory.setSessionCacheSize(environment.getProperty("jms.cache.size", Integer.class)); 
    return cachingConnectionFactory; 
} 

@Bean 
public JmsMessagingTemplate jmsMessagingTemplate(){ 
    ActiveMQTopic activeMQ = new ActiveMQTopic(environment.getProperty("jms.queue.out")); 

    JmsMessagingTemplate jmsMessagingTemplate = new JmsMessagingTemplate(connectionFactory()); 
    jmsMessagingTemplate.setDefaultDestination(activeMQ); 

    return jmsMessagingTemplate; 
} 

application.properties

jms.connections.concurrent=1-3 
jms.connections.prefetch=1000 
jms.connections.transacted=true 
jms.connections.cache.consumers=false 
jms.redelivery.initial-delay=1000 
jms.redelivery.delay=1000 
jms.redelivery.maximum=5 
jms.redelivery.use-exponential-back-off=true 
jms.redelivery.back-off-multiplier=2 
jms.cache.size=3 
jms.queue.in=in.queue 
jms.queue.out=out.queue 
jms.broker.endpoint=failover:(tcp://localhost:61616) 

回答

1

尝试设置maxMessagesPerTask > 0

@Bean 
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws Throwable { 
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); 

    factory.setConnectionFactory(connectionFactory()); 
    factory.setMaxMessagesPerTask(1); 
    factory.setConcurrency(environment.getProperty("jms.connections.concurrent")); 
    factory.setSessionTransacted(environment.getProperty("jms.connections.transacted", Boolean.class)); 
    return factory; 
} 

,你可以参考文档http://docs.spring.io/spring-framework/docs/4.3.x/javadoc-api/org/springframework/jms/listener/DefaultMessageListenerContainer.html#setMaxMessagesPerTask-int-

jms.connections.prefetch=1000意味着如果您有1000条消息在Q上等待,您将只有1个线程开始处理这1000条消息。

例如jms.connections.prefetch=1表示消息将平等地分派给所有可用的线程,但对于此设置maxMessagesPerTask < 0更好,因为长寿命任务避免了频繁的线程上下文切换。 http://activemq.apache.org/what-is-the-prefetch-limit-for.html

+0

谢谢!我试了一下,它的工作!然而,当我分析时,我注意到dmlc容器线程不断被重新创建,假设基于接收尝试和每个任务的最大消息值。这只是一个观察,任何人都可以对此发表评论? – jcb

+0

是的,我给了你一个例子,每个任务有1个最大消息,但是1个是我提供的文档链接中说的太低,每个线程在1次尝试后都会死掉。您需要将max messagespertask增加到10,例如允许10次尝试,并增加IdleTaskExecutionLimit以允许在达到最大尝试次数时重新使用线程。它在文档http://docs.spring.io/spring-framework/docs/4.3.x/javadoc-api/org/springframework/jms/listener/DefaultMessageListenerContainer.html#setIdleTaskExecutionLimit-int- –