2014-12-04 27 views
0

我使用Spring JMS和ActiveMQ,其中有一个将消息推送到队列的客户端,并且我有多个使用者线程正在监听和从队列中移除消息。一些消息从两位消费者的队列中排队。我不想要这种行为,并且希望确保只有一条消息仅由一个消费者线程处理。关于我出错的地方有什么想法?ActiveMQ并发问题 - 多个使用者从队列中使用相同消息

春3.2.2配置:

<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util" 
    xsi:schemaLocation=" 
     http://www.springframework.org/schema/beans  
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd 
     http://www.springframework.org/schema/context 
     http://www.springframework.org/schema/context/spring-context-3.0.xsd 
     http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd 
     http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd"> 
    <context:annotation-config /> 
    <context:component-scan base-package="com.myapp" /> 

    <!-- JMS ConnectionFactory config Starts --> 
    <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 
     <property name="brokerURL"> 
      <value>${brokerURL}</value> 
     </property> 
     <property name="userName" value="${username}" /> 
     <property name="password" value="${password}" /> 
    </bean> 

    <bean id="pooledJmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" 
     init-method="start" destroy-method="stop"> 
     <property name="connectionFactory" ref="jmsConnectionFactory" /> 
    </bean> 
    <!-- JMS ConnectionFactory config Ends --> 

    <!-- JMS Template config Starts --> 
    <bean id="myQueue" class="org.apache.activemq.command.ActiveMQQueue"> 
     <constructor-arg value="${activemq.consumer.destinationName}" /> 
    </bean> 

    <bean id="myQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> 
     <property name="connectionFactory" ref="pooledJmsConnectionFactory" /> 
    </bean> 
    <!-- JMS Template config Ends --> 

    <!-- JMS Listener config starts --> 
    <bean id="simpleMessageConverter" 
     class="org.springframework.jms.support.converter.SimpleMessageConverter" /> 

    <bean id="myContainer" 
     class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 
     <property name="concurrentConsumers" value="${threadcount}" /> 
     <property name="connectionFactory" ref="pooledJmsConnectionFactory" /> 
     <property name="destination" ref="myQueue" /> 
     <property name="messageListener" ref="myListener" /> 
     <property name="messageSelector" value="JMSType = 'New'" /> 
    </bean> 

    <bean id="myListener" 
     class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> 
     <constructor-arg> 
      <bean class="myapp.MessageListener" /> 
     </constructor-arg> 
     <property name="defaultListenerMethod" value="receive" /> 
     <property name="messageConverter" ref="simpleMessageConverter" /> 
    </bean> 
    <!-- JMS Listener config Ends --> 


    <!-- enable the configuration of transactional behavior based on annotations --> 
    <bean id="myJMSMessageSender" class="myapp.JMSMessageSender"> 
     <property name="jmsTemplate" ref="myQueueTemplate" /> 
     <property name="jmsQueue" ref="myQueue" /> 
     <property name="messageConverter" ref="simpleMessageConverter" /> 
    </bean> 


    <bean id="myQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> 
     <property name="connectionFactory" ref="pooledJmsConnectionFactory" /> 
    </bean> 

</beans> 

的ActiveMQ 5.9.1配置:

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="instance8161" dataDirectory="${activemq.data}" persistent="false"> 

     <destinationPolicy> 
      <policyMap> 
       <policyEntries> 
       <policyEntry topic="&gt;"> 
        <!-- The constantPendingMessageLimitStrategy is used to prevent 
         slow topic consumers to block producers and affect other consumers 
         by limiting the number of messages that are retained 
         For more information, see: 

         http://activemq.apache.org/slow-consumer-handling.html 

        --> 
        <pendingMessageLimitStrategy> 
        <constantPendingMessageLimitStrategy limit="1000"/> 
        </pendingMessageLimitStrategy> 
       </policyEntry> 
       </policyEntries> 
      </policyMap> 
     </destinationPolicy> 

     ... <!-- rest is default ActiveMQ Config --> 
</broker> 
+0

您的消费者在确认收到两次交付的消息之前是否出错?这当然可以解释邮件重新传递,如果是这样,你必须决定是否不止一次地收到邮件更好(保证你成功处理一次)或者不要第二次收到邮件(所以你永远不会成功处理该消息)。 – Tim 2014-12-08 23:10:33

回答

1

最有可能的,你myapp.MessageListener(或它的一个依赖)不thread-安全并且您在消费者线索中看到了交叉对话。

最佳做法是将您的侦听器设计为无状态(类中没有突变字段)。如果这是不可能的,你需要用锁来保护共享变量。

+0

这些建议是避免可能导致此问题的事情之一,但消息监听器中的线程安全性问题并不是获得消息重新传递的唯一方法(也不是我的经验中最可能的),所以我会犹豫不决,说这是“最可能”的问题。这是一回事,但不要沿着这条道路排斥其他人。 – Tim 2014-12-08 23:15:56

+0

我同意;您对重新交付的评论是正确的,但是“有些时候,两个消费者将相同的消息从Queue中排队。”听起来他说的是他正在谈论的是同时交付,而不是重新传递被拒绝的信息。 – 2014-12-08 23:45:03

+0

这绝对可能是你解释它的方式。如果操作系统看到正确的消息总数,但有些会得到不止一次的处理,有些则从不处理,那么你的线程安全解释可能就是这样。如果OP看到N次以上的尝试处理N个消息,那么这可能是一种重新传递的情况(或者消息在某处重复的情况)。 – Tim 2014-12-08 23:52:53

相关问题