2012-11-21 41 views
1

我有一个JMS生成,其生成每秒许多消息,这些消息被发送到AMQ永久队列和由单个消费者,这需要顺序地处理它们被消耗。但看起来制片人比消费者要快得多,而且我有性能和内存问题。消息被拿来非常非常缓慢和消耗似乎发生在区间(消费者的“要求”在投票的方式,这是奇怪的消息?)jms和spring集成的性能问题。以下配置有什么问题?

基本上发生的一切与Spring集成。这是生产者一方的配置。首创股份的消息进来stakesInMemoryChannel,从那里,他们被过滤掷filteredStakesChannel,并从那里,他们会到JMS队列

<bean id="stakesQueue" class="org.apache.activemq.command.ActiveMQQueue"> 
     <constructor-arg name="name" value="${jms.stakes.queue.name}" /> 
    </bean> 

    <int:channel id="stakesInMemoryChannel" /> 

    <int:channel id="filteredStakesChannel" > 
     <int:dispatcher task-executor="taskExecutor"/> 
    </int:channel> 

    <bean id="stakeFilterService" class="cayetano.games.stake.StakeFilterService"/> 

    <int:filter 
     input-channel="stakesInMemoryChannel" 
     output-channel="filteredStakesChannel" 
     throw-exception-on-rejection="false" 
     expression="true"/> 

    <jms:outbound-channel-adapter channel="filteredStakesChannel" destination="stakesQueue" delivery-persistent="true" explicit-qos-enabled="true" /> 

    <task:executor id="taskExecutor" pool-size="100" /> 

其他应用程序消耗(因此发送在单独的线程会发生使用执行人)这样的消息......这些消息来自jms stakesQueue的stakesInputChannel,之后它们被路由到2个独立的频道,一个持续发送消息,另一个做一些其他的事情,让我们称之为“处理”。

<bean id="stakesQueue" class="org.apache.activemq.command.ActiveMQQueue"> 
    <constructor-arg name="name" value="${jms.stakes.queue.name}" /> 
</bean> 

<jms:message-driven-channel-adapter 
    channel="stakesInputChannel" 
    destination="stakesQueue" 
    acknowledge="auto" 
    concurrent-consumers="1" 
    max-concurrent-consumers="1" 
    /> 
<int:publish-subscribe-channel id="stakesInputChannel" /> 
<int:channel id="persistStakesChannel" /> 
<int:channel id="processStakesChannel" /> 

<int:recipient-list-router 
     id="customRouter" 
     input-channel="stakesInputChannel" 
     timeout="3000" 
     ignore-send-failures="true" 
     apply-sequence="true" 
     > 
    <int:recipient channel="persistStakesChannel"/> 
    <int:recipient channel="processStakesChannel"/> 
</int:recipient-list-router> 


<bean id="prefetchPolicy" class="org.apache.activemq.ActiveMQPrefetchPolicy"> 
    <property name="queuePrefetch" value="${jms.broker.prefetch.policy}" /> 
</bean> 

<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> 
    <property name="targetConnectionFactory"> 
     <bean class="org.apache.activemq.ActiveMQConnectionFactory"> 
      <property name="brokerURL" value="${jms.broker.url}" />  
      <property name="prefetchPolicy" ref="prefetchPolicy" /> 
      <property name="optimizeAcknowledge" value="true" /> 
      <property name="useAsyncSend" value="true" /> 
     </bean> 
    </property> 
    <property name="sessionCacheSize" value="10"/> 
    <property name="cacheProducers" value="false"/> 
</bean> 

回答

1

不知道你的意思是“在区间(消费者的‘要求’在投票的方式,这是奇怪的消息”。

容器线程可能“看”像他们投票,但它们不是;它们在AMQ客户端中阻塞,直到消息到达或超时;当超时时,它立即返回到AMQ接收()

这个配置看起来很好;有一个线程,消耗率将直接取决于你在路由器下游做什么。

+0

我创建JMS用JMeter,它直接将消息发送到ActiveMQ的计划。我发送了更多的消息 - 每20毫秒发送一个消息 - 消费者应用程序正在处理它们,没有任何延迟......所以,问题必须在生产者配置中。 – user358448

0

建议使用PooledConnectionFactory。建议使用Spring JmsTemplate,并将它连接到Connection,Session和MessageProducer实例,以便在不再使用它们时返回它们。

我相信“间隔”的行为,你所看到的在消费者一方是消费者超时。

相反的是,加里·拉塞尔说:amq.receive()的确有效队列的查询。 Spring配置隐藏了这个,但是消息基本上是从队列中拉出队列的消费者的循环中的一个循环中。队列的使用者无法知道消息是否在队列中,直到它调用来尝试获取消息。

这是与一个主题,在这里当注册消息来在执行动作的听者相对的。主题是一个很好的解决方案,因为你注册处理消息的监听器。

与一个主题,你告诉的ActiveMQ做什么用的消息,与队列,ActiveMQ的只是当你问它给你的消息。

+0

谢谢,我将与池连接工厂改变它...我会重复我对加里说...... 我创建JMS计划用JMeter,它直接将消息发送到ActiveMQ的。我发送了更多的消息 - 每20毫秒一次 - 消费者应用程序正在处理它们,没有任何延迟......所以,问题必须在生产者配置中 – user358448

+0

“......确实有效地轮询队列......“是的;我的意思是说Spring不会间隔地”轮询“队列(例如每隔几毫秒),实际上,除了在超时之后快速周转之外,线程始终坐在amq.receive()等待中消息到达,超时时间可以从默认值增加,但是需要它,所以我们可以对其他事情作出反应,例如停止容器。 –

+0

啊,是的,它不会间隔轮询它,它本质上是等效的一段时间(真)的循环。很高兴我们在同一页面上。 –