我使用Spring JMS和ActiveMQ,其中有一个将消息推送到队列的客户端,并且我有多个使用者线程正在监听和从队列中移除消息。一些消息从两位消费者的队列中排队。我不想要这种行为,并且希望确保只有一条消息仅由一个消费者线程处理。关于我出错的地方有什么想法?ActiveMQ并发问题 - 多个使用者从队列中使用相同消息
春3.2.2配置:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.0.xsd">
<context:annotation-config />
<context:component-scan base-package="com.myapp" />
<!-- JMS ConnectionFactory config Starts -->
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<value>${brokerURL}</value>
</property>
<property name="userName" value="${username}" />
<property name="password" value="${password}" />
</bean>
<bean id="pooledJmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
init-method="start" destroy-method="stop">
<property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>
<!-- JMS ConnectionFactory config Ends -->
<!-- JMS Template config Starts -->
<bean id="myQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="${activemq.consumer.destinationName}" />
</bean>
<bean id="myQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="pooledJmsConnectionFactory" />
</bean>
<!-- JMS Template config Ends -->
<!-- JMS Listener config starts -->
<bean id="simpleMessageConverter"
class="org.springframework.jms.support.converter.SimpleMessageConverter" />
<bean id="myContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="concurrentConsumers" value="${threadcount}" />
<property name="connectionFactory" ref="pooledJmsConnectionFactory" />
<property name="destination" ref="myQueue" />
<property name="messageListener" ref="myListener" />
<property name="messageSelector" value="JMSType = 'New'" />
</bean>
<bean id="myListener"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="myapp.MessageListener" />
</constructor-arg>
<property name="defaultListenerMethod" value="receive" />
<property name="messageConverter" ref="simpleMessageConverter" />
</bean>
<!-- JMS Listener config Ends -->
<!-- enable the configuration of transactional behavior based on annotations -->
<bean id="myJMSMessageSender" class="myapp.JMSMessageSender">
<property name="jmsTemplate" ref="myQueueTemplate" />
<property name="jmsQueue" ref="myQueue" />
<property name="messageConverter" ref="simpleMessageConverter" />
</bean>
<bean id="myQueueTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="pooledJmsConnectionFactory" />
</bean>
</beans>
的ActiveMQ 5.9.1配置:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="instance8161" dataDirectory="${activemq.data}" persistent="false">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">">
<!-- The constantPendingMessageLimitStrategy is used to prevent
slow topic consumers to block producers and affect other consumers
by limiting the number of messages that are retained
For more information, see:
http://activemq.apache.org/slow-consumer-handling.html
-->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
... <!-- rest is default ActiveMQ Config -->
</broker>
您的消费者在确认收到两次交付的消息之前是否出错?这当然可以解释邮件重新传递,如果是这样,你必须决定是否不止一次地收到邮件更好(保证你成功处理一次)或者不要第二次收到邮件(所以你永远不会成功处理该消息)。 – Tim 2014-12-08 23:10:33