2014-08-29 50 views
1

尝试让JMS MessageConsumer在ActiveMQ重新启动后继续存在,因此可以使用故障切换传输协议重新连接。使用MessageListener的JMS MessageConsumer在ActiveMQ关闭时终止

但是,它会在ActiveMQ关闭时终止。

这看起来像报道和“解决”的错误,但我仍然在ActiveMQ中5.10.0的最新版本看到这

我用下面的行家依赖

<dependency> 
     <groupId>org.apache.activemq</groupId> 
     <artifactId>activemq-all</artifactId> 
     <version>5.10.0</version> 
    </dependency> 

下面是使用一些示例代码

public class SimpleConsumer { 

public static void main(String[] args) throws Exception { 
    String url = "failover:(tcp://ACTIVE_MQ_HOST:61616)"; 
    String destination = "test-topic"; 

    TopicConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
      url); 

    ActiveMQConnection connection = (ActiveMQConnection) connectionFactory 
      .createConnection(); 

    Session session = connection.createSession(false, 
      Session.AUTO_ACKNOWLEDGE); 

    Topic topic = session.createTopic(destination); 

    MessageConsumer consumer = session.createConsumer(topic); 
    connection.start(); 

// Uncomment these lines and comment out the lines below and it will work 
//  while (true) { 
//   Message msg = consumer.receive(); 
//   if (msg instanceof TextMessage) { 
//    System.out.println("msg received = " + msg); 
//   } 
//  } 

    consumer.setMessageListener(new MessageListener() { 

     public void onMessage(Message msg) { 
      System.out.println("msg received = " + msg); 
     } 

    }); 

} 

}

我会喜欢它与MessageListener一起工作,它是非阻塞和异步的。

任何帮助,这是非常感谢。

我已经尝试了上面JIRA报告中提到的一些东西,它是在非守护线程中运行这个,但那不起作用。

我想这

public class SimpleConsumerThread { 

    public static void main(String[] args) throws Exception { 


     Thread t = new Thread() { 
      public void run() { 
       try {    
        String url = "failover:(tcp://ACTIVEMQ_HOST:61616)"; 
        String destination = "test-topic"; 

        TopicConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); 

        ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); 

        Session session = connection.createSession(false, 
        Session.AUTO_ACKNOWLEDGE); 

        Topic topic = session.createTopic(destination); 

        MessageConsumer consumer = session.createConsumer(topic); 
        connection.start(); 
        consumer.setMessageListener(new MessageListener() { 

         public void onMessage(Message msg) { 
          System.out.println("msg received = " + msg); 
         } 

        }); 
       } catch (JMSException e) { 
        e.printStackTrace(); 
       }    
      } 
     }; 
     t.setDaemon(false); 

     t.start(); 

    } 

} 

回答

1

的原因,你的解决方案螺纹不工作是因为线程运行结束后()方法完成,然后你没有像以前那样运行非守护线程。依靠第三方库的内部线程模型来保持应用程序运行并不是一个好主意。

无论ActiveMQ客户端中存在错误或其他配置错综复杂的问题,最好的解决方案都是使用while(true)sleep()范例来让主线程保持活动状态。

1

谢谢蒂姆,

是的工作。我刚刚添加了,至少保持一个用户线程活着,以便程序不终止。

while(true) { 
     Thread.sleep(1000); 
    } 

欢呼声,

public class SimpleConsumer { 

    static Logger logger = Logger.getLogger(SimpleConsumer.class); 

    public static void main(String[] args) throws Exception { 
     String url = "failover:(tcp://sydapp057lx.fxdms.net:61615)"; 
     String destination = "test-topic"; 

     TopicConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
       url); 

     ActiveMQConnection connection = (ActiveMQConnection) connectionFactory 
       .createConnection(); 

     connection.setExceptionListener(new ExceptionListener() { 
      public void onException(JMSException e) { 
       logger.debug("got exception = " + e); 
      } 
     }); 

     Session session = connection.createSession(false, 
       Session.AUTO_ACKNOWLEDGE); 

     Topic topic = session.createTopic(destination); 

     MessageConsumer consumer = session.createConsumer(topic); 
     connection.start(); 

     consumer.setMessageListener(new MessageListener() { 

      public void onMessage(Message msg) { 
       logger.debug("msg received = " + msg); 
      } 

     }); 

     while(true) { 
      Thread.sleep(1000); 
     } 

    } 


}