2013-10-31 82 views
4

我有activemq在我的系统中使用,我看到的是以下消息: TopicSubscription:consumer = ...:等待消息游标[org.apache.activemq.broker.region .cursors.VMPendingMessageCursor @ 1684f89c]已满,达到临时使用(0%)或内存使用率(100%)限制,阻止消息添加(),等待释放资源。activemq缓慢消费者块生产者,虽然producerFlowControl是false

这是因为如果我理解正确,我的消费者速度很慢,而我的制片人速度很快。其结果是,最终我的制作人被阻止,直到消费者阅读消息并释放一些内存。我想知道的是,我的制作人没有被封锁,而且当内存已满时,旧信息正在被分散。

鉴于我的理解我已阅读​​以下配置应该做的伎俩(messageEvictionStrategy,pendingMessageLimitStrategy),但它不适合我,我不明白为什么。

由于测试的原因,我已经指定低存储空间限制低(35Mb),以使问题更快,但情况是我最终需要它,当问题发生时,activemq才放弃旧消息。

我发现在ActiveMQConnectionFactory useAsyncSend = true并指定sendTimeout中设置了一个令人不满意的解决方案。这使得生产者不被阻止,但是通过这种方式,最新的消息被丢弃,而不是旧的消息。

最后,我正在谈论非持久性话题。

任何帮助家伙会完美。下面我ActiveMQ的配置

 <destinationPolicy> 
     <policyMap> 
      <policyEntries> 
       <policyEntry topic=">" producerFlowControl="false" memoryLimit="35 Mb"> 
       <pendingSubscriberPolicy> 
        <vmCursor /> 
       </pendingSubscriberPolicy> 
        <messageEvictionStrategy> 
         <oldestMessageEvictionStrategy/> 
        </messageEvictionStrategy> 
        <pendingMessageLimitStrategy> 
         <constantPendingMessageLimitStrategy limit="10"/> 
        </pendingMessageLimitStrategy> 
       </policyEntry> 
      </policyEntries> 
     </policyMap> 
    </destinationPolicy> 

    <systemUsage> 
     <systemUsage sendFailIfNoSpace="true"> 
      <memoryUsage> 
       <memoryUsage limit="35 mb"/> 
      </memoryUsage> 
      <storeUsage> 
       <storeUsage limit="1 gb"/> 
      </storeUsage> 
      <tempUsage> 
       <tempUsage limit="5000 mb"/> 
      </tempUsage> 
     </systemUsage> 
    </systemUsage> 

的ActiveMQ版本5.7.0

我使用Spring模板来发送邮件:

<bean class="org.springframework.jms.core.JmsTemplate"> 
    <property name="connectionFactory" ref="pooledJmsConnectionFactory"/> 
    <property name="timeToLive" value="100"/> 
</bean> 

我发送javax.jms.ObjectMessage,体积小relativelly。

我在客户前提中发现问题我在应用程序中有许多应用程序,但设法重现它从1个线程不断地发送,不停地发送消息总是到同一主题。发送的消息只是一个小字符串。

我只有一个生产者,当我有1个(或更多)慢消费者时,问题似乎就会出现 - 但是一个慢消费者就够了。如果不存在缓慢的消费者,则不会出现问题。

我不认为这有什么差别,但我使用

 <transportConnectors> 
     <transportConnector name="openwire" uri="nio://0.0.0.0:33029?wireFormat.maxInactivityDuration=60000&amp;wireFormat.maxInactivityDurationInitalDelay=60000"/> 
    </transportConnectors> 
+0

难题或我不描述的东西?请告诉我,如果有什么东西是未知的或需要进一步澄清 – Alexandros

回答

1

事实证明,当使用JmsTemplate为了使异步发送,然后消息无法传递,我们需要启用explicitQosEnabled并设置deliveryMode = 1(非持久性)。 同样在客户端,消费者需要有一个较小的预取限制

服务器

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 
    <property name="connectionFactory" ref="pooledJmsConnectionFactory"/> 
    <property name="explicitQosEnabled" value="true"/> 
    <property name="deliveryMode" value="1"/> 
</bean> 

客户

<jms:listener-container .. prefetch="1000"> 
... 
</jms:listener-container> 

不要问我为什么...但是这似乎已经解决了我的问题。 基本上不是100%需要,但如果有人能向我解释这将是完美

2

如何我可以重新呢?有多少生产者/消费者参与这个主题?这只是一个主题吗?

您的设置看起来不错,但您不需要在策略上设置memoryLimit = 35mb。将它设置为与整个系统使用情况相同是没有意义的。这个想法是所有主题组合的内存限制等于系统内存限制。例如,如果您有两个主题,则每个主题都使用35MB(2 * 35 == 70MB),这会超过整个系统内存设置。我不认为这是你所看到的具体原因,但要记住。

也是什么版本的ActiveMQ是这样的?如果你已经写了一些可以产生这种结果的测试,请告诉我。

+0

在上面的主要问题中添加回答你的问题 – Alexandros

+0

memoryLimit是的,我读了它并不需要,因为你上面描述,我正在阅读activemq配置,并试图使其工作所以我加了它,当你提到它存在或不存在时它没有区别。 – Alexandros

+0

我可以通过一种方式进一步调查,我可以下载源代码并检查某些内容,但不知道在哪里放置断点。我真的坚持这个问题,需要帮助。 也是一个额外的信息,可能是有用的,我使用timeToLive = 100的JMSTemplate。 – Alexandros