2015-07-13 30 views
2

我在Mule中有一个非常简单的流程,它监听ActiveMQ队列,并且如果队列中的消息包含字符串“fail”,则会引发异常。没有XA交易,一切正常。该消息在5秒后重新传送,下一次再传送几秒钟。 5次重新传送后,该消息被移至死信队列。如何在Mule XA事务中为ActiveMQ启用RedeliveryDelay

通过XA事务,消息立即被重新传递5次,然后移动到死信队列中。我如何告诉Mule或ActiveMQ等待RedeliveryDelay?

<?xml version="1.0" encoding="UTF-8"?> 

<mule xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting" 
    xmlns:jbossts="http://www.mulesoft.org/schema/mule/jbossts" xmlns:jms="http://www.mulesoft.org/schema/mule/jms" 
    xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" 
    xmlns:spring="http://www.springframework.org/schema/beans" version="CE-3.6.1" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd 
http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd 
http://www.mulesoft.org/schema/mule/jms http://www.mulesoft.org/schema/mule/jms/current/mule-jms.xsd 
http://www.mulesoft.org/schema/mule/jbossts http://www.mulesoft.org/schema/mule/jbossts/current/mule-jbossts.xsd 
http://www.mulesoft.org/schema/mule/scripting http://www.mulesoft.org/schema/mule/scripting/current/mule-scripting.xsd"> 

    <spring:beans> 

     <!-- Redelivery Policy --> 
     <spring:bean id="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"> 
      <spring:property name="maximumRedeliveries" value="5" /> 
      <spring:property name="initialRedeliveryDelay" value="5000" /> 
      <spring:property name="redeliveryDelay" value="5000" /> 
      <spring:property name="useExponentialBackOff" value="true" /> 
      <spring:property name="backOffMultiplier" value="1.5" /> 
     </spring:bean> 

     <!-- ActiveMQ Connection factory --> 
     <spring:bean id="amqXAFactory" 
      class="org.apache.activemq.ActiveMQXAConnectionFactory" lazy-init="true" 
      name="amqXAFactory"> 
      <spring:property name="brokerURL" 
       value="tcp://localhost:61616" /> 
      <spring:property name="redeliveryPolicy" ref="redeliveryPolicy" /> 
     </spring:bean> 

    </spring:beans> 

    <jbossts:transaction-manager doc:name="JBoss Transaction Manager"> 
     <property key="com.arjuna.ats.arjuna.coordinator.defaultTimeout" 
      value="47" /><!-- this is in seconds --> 
     <property key="com.arjuna.ats.arjuna.coordinator.txReaperTimeout" 
      value="108000" /><!-- this is in milliseconds --> 
    </jbossts:transaction-manager> 

    <jms:activemq-connector name="Active_MQconnectorMessages" 
     specification="1.1" validateConnections="true" doc:name="Active MQ for XA" 
     persistentDelivery="true" connectionFactory-ref="amqXAFactory" 
     numberOfConsumers="1" maxRedelivery="5"> 
     <reconnect-forever frequency="10000" /> 
    </jms:activemq-connector> 

    <flow name="xatestFlow"> 
     <jms:inbound-endpoint queue="xa.test" doc:name="JMS" 
      connector-ref="Active_MQconnectorMessages"> 
      <xa-transaction action="ALWAYS_BEGIN" /> 
     </jms:inbound-endpoint> 
     <logger message="Message from queue: #[payload]" level="INFO" 
      doc:name="Logger" /> 
     <scripting:component doc:name="fail if payload contains fail"> 
      <scripting:script engine="Groovy"><![CDATA[if (payload.contains("fail")) { 
    throw new IllegalArgumentException("Failed...") 
} 
]]></scripting:script> 
     </scripting:component> 
     <logger message="Message is ready" level="INFO" doc:name="Logger" /> 
    </flow> 
</mule> 

回答

1

花了一些时间,但我发现了解决方案。有两种类型的配置,ActiveMQ中的服务器端和客户端。

在客户端redeliveryPolicy我改变maximumRedeliveries为0。我在activemq.xml中创建一个redeliveryPlugin服务器:

<broker ....> 
    ...... 
    <plugins> 
     <redeliveryPlugin fallbackToDeadLetter="true" sendToDlqIfMaxRetriesExceeded="true"> 
      <redeliveryPolicyMap> 
       <redeliveryPolicyMap> 
        <redeliveryPolicyEntries> 
         <!-- a destination specific policy --> 
         <redeliveryPolicy 
          queue="amm.input.to.router" maximumRedeliveries="3" 
          redeliveryDelay="7000" 
          initialRedeliveryDelay="5000" 
          useCollisionAvoidance="true" 
          /> 
        </redeliveryPolicyEntries> 

        <!-- the fallback policy for all other destinations --> 
        <defaultEntry> 
         <redeliveryPolicy 
          maximumRedeliveries="4" 
          useExponentialBackOff="true" 
          initialRedeliveryDelay="10000" 
          redeliveryDelay="5000" 
          useCollisionAvoidance="true" 
          backOffMultiplier="1.5" 
          maximumRedeliveryDelay="93600000" /> 
        </defaultEntry> 
       </redeliveryPolicyMap> 
      </redeliveryPolicyMap> 
     </redeliveryPlugin> 
    </plugins> 

    </broker> 

现在是ActiveMQ的服务器采取重复传递,而不是客户端的照顾。

请参阅http://activemq.apache.org/message-redelivery-and-dlq-handling.html