2014-04-15 72 views
1

我有一个超级简单的场景:一个经纪人和一个持久订阅的消费者。 这是我的消费者应用程序的代码:ActiveMQ故障转移似乎不起作用

package test; 

import javax.jms.Connection; 
import javax.jms.ConnectionFactory; 
import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageListener; 
import javax.jms.Session; 
import javax.jms.TextMessage; 
import javax.jms.Topic; 

import org.apache.activemq.ActiveMQConnectionFactory; 

import pojo.Event; 
import pojo.StockUpdate; 

public class Consumer 
{ 

    private static transient ConnectionFactory factory; 
    private transient Connection connection; 
    private transient Session session; 
    public static int counter = 0; 

    public Consumer(String brokerURL) throws JMSException 
    { 
     factory = new ActiveMQConnectionFactory(brokerURL); 
     connection = factory.createConnection(); 
     connection.setClientID("CLUSTER_CLIENT_1"); 
     connection.start(); 
     session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
    } 

    public void close() throws JMSException 
    { 
     if (connection != null) 
     { 
      connection.close(); 
     } 
    } 

    public static void main(String[] args) throws JMSException 
    { 

     try 
     { 
      // extract topics from the rest of arguments 
      String[] topics = new String[2]; 
      topics[0] = "CSCO"; 
      topics[1] = "ORCL"; 

      // define connection URI 
      Consumer consumer = new Consumer("failover:(tcp://localhost:61616)?maxReconnectAttempts=-1&useExponentialBackOff=true"); 

      for (String stock : topics) 
      { 
       try 
       { 
        Destination destination = consumer.getSession().createTopic("STOCKS." + stock); 
        // consumer.getSession(). 
        MessageConsumer messageConsumer = consumer.getSession().createDurableSubscriber((Topic) destination, "STOCKS_DURABLE_CONSUMER_" + stock); 
        messageConsumer.setMessageListener(new Listener()); 
       } 
       catch (JMSException e) 
       { 
        e.printStackTrace(); 
       } 
      } 
     } 
     catch (Throwable t) 
     { 
      t.printStackTrace(); 
     } 

    } 

    public Session getSession() 
    { 
     return session; 
    } 

} 

class Listener implements MessageListener 
{ 

    public void onMessage(Message message) 
    { 
     try 
     { 
      TextMessage textMessage = (TextMessage) message; 
      String json = textMessage.getText(); 
      Event event = StockUpdate.fromJSON(json, StockUpdate.class); 
      System.out.println("Consumed message #:" + ++Consumer.counter + "\n" + event); 
     } 
     catch (Exception e) 
     { 
      e.printStackTrace(); 
     } 
    } 

} 

这里是我的activemq.xml中

<beans 
    xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 
    http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> 

    <!-- Allows us to use system properties as variables in this configuration file --> 
    <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> 
     <property name="locations"> 
      <value>file:${activemq.conf}/credentials.properties</value> 
     </property> 
    </bean> 

    <!-- 
     The <broker> element is used to configure the ActiveMQ broker. 
    --> 
    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="R6_cluster_broker1" persistent="true"> 

     <networkConnectors> 
      <networkConnector uri="static:(failover:(tcp://remote_master:61616,tcp://remote_slave:61617))"/> 
     </networkConnectors> 

     <destinationPolicy> 
      <policyMap> 
       <policyEntries> 
       <policyEntry topic=">" > 
        <!-- The constantPendingMessageLimitStrategy is used to prevent 
         slow topic consumers to block producers and affect other consumers 
         by limiting the number of messages that are retained 
         For more information, see: 

         http://activemq.apache.org/slow-consumer-handling.html 

        --> 
        <pendingMessageLimitStrategy> 
        <constantPendingMessageLimitStrategy limit="1000"/> 
        </pendingMessageLimitStrategy> 
       </policyEntry> 
       </policyEntries> 
      </policyMap> 
     </destinationPolicy> 


     <!-- 
      The managementContext is used to configure how ActiveMQ is exposed in 
      JMX. By default, ActiveMQ uses the MBean server that is started by 
      the JVM. For more information, see: 

      http://activemq.apache.org/jmx.html 
     --> 
     <managementContext> 
      <managementContext createConnector="false"/> 
     </managementContext> 

     <!-- 
      Configure message persistence for the broker. The default persistence 
      mechanism is the KahaDB store (identified by the kahaDB tag). 
      For more information, see: 

      http://activemq.apache.org/persistence.html 
     --> 
     <persistenceAdapter> 
      <kahaDB directory="/work/temp/kahadb"/> 
     </persistenceAdapter> 


      <!-- 
      The systemUsage controls the maximum amount of space the broker will 
      use before disabling caching and/or slowing down producers. For more information, see: 
      http://activemq.apache.org/producer-flow-control.html 
      --> 
      <systemUsage> 
      <systemUsage> 
       <memoryUsage> 
        <memoryUsage percentOfJvmHeap="70" /> 
       </memoryUsage> 
       <storeUsage> 
        <storeUsage limit="100 gb"/> 
       </storeUsage> 
       <tempUsage> 
        <tempUsage limit="50 gb"/> 
       </tempUsage> 
      </systemUsage> 
     </systemUsage> 

     <!-- 
      The transport connectors expose ActiveMQ over a given protocol to 
      clients and other brokers. For more information, see: 

      http://activemq.apache.org/configuring-transports.html 
     --> 
     <transportConnectors> 
      <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> 
      <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
      <!-- <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
      <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
      <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
      <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> --> 
     </transportConnectors> 

     <!-- destroy the spring context on shutdown to stop jetty --> 
     <shutdownHooks> 
      <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" /> 
     </shutdownHooks> 

    </broker> 

    <!-- 
     Enable web consoles, REST and Ajax APIs and demos 
     The web consoles requires by default login, you can disable this in the jetty.xml file 

     Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details 
    --> 
    <import resource="jetty.xml"/> 

</beans> 

当我有两个经纪人和消费者的运行,然后停止我的消费者离开后几分钟代理。据我了解,它必须尝试重新连接,但事实并非如此。我做错了什么,请指教。

!注意!我在Eclipse中启动消费者,我不为此任务构建独立的jar。

我已经将我的经纪人更新到最新的5.9.1版本,并且对我的消费者也这样做。结果是一样的 - 在我停止经纪人后,我的消费者在几秒钟后死亡。如果经纪人正在运行,它会正常工作。

+0

我已经将我的经纪人更新至最新的5.9.1版,并且对我的消费者也做了同样的处理。结果是一样的 - 在我停止经纪人后,我的消费者在几秒钟后死亡。如果经纪人正在运行,它会正常工作。 –

回答

1

好的,问题实际上是在我的代码中:没有任何东西可以阻止主线程退出。由于实现故障转移的线程是一个守护进程线程,因此消费者应用程序在没有任何东西驻留(没有非守护进程线程)之后立即终止。

+0

我刚刚发现了同样的“问题”。感谢您确认原因! 你实施了什么样的解决方案来避免主线程终止? –

+0

据我记得,我已经添加了关机钩子。 –

0

很可能你正在使用的ActiveMQ版本有一个错误,导致所有的守护进程线程,这意味着没有什么可以保持客户端运行。升级到更高版本,如v5.9.1,看看是否有帮助。如果没有发布更多的信息,因为你没有提供太多的信息。

+0

Tim,我正在使用Apache ActiveMQ 5.9.0 您希望我发布什么额外信息? –

+0

我已更新所有信息的问题。 –

+0

我可以用'5.13.2'确认行为。 –