尝试让JMS MessageConsumer在ActiveMQ重新启动后继续存在,因此可以使用故障切换传输协议重新连接。使用MessageListener的JMS MessageConsumer在ActiveMQ关闭时终止
但是,它会在ActiveMQ关闭时终止。
这看起来像报道和“解决”的错误,但我仍然在ActiveMQ中5.10.0的最新版本看到这
我用下面的行家依赖
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.10.0</version>
</dependency>
下面是使用一些示例代码
public class SimpleConsumer {
public static void main(String[] args) throws Exception {
String url = "failover:(tcp://ACTIVE_MQ_HOST:61616)";
String destination = "test-topic";
TopicConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
url);
ActiveMQConnection connection = (ActiveMQConnection) connectionFactory
.createConnection();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(destination);
MessageConsumer consumer = session.createConsumer(topic);
connection.start();
// Uncomment these lines and comment out the lines below and it will work
// while (true) {
// Message msg = consumer.receive();
// if (msg instanceof TextMessage) {
// System.out.println("msg received = " + msg);
// }
// }
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
System.out.println("msg received = " + msg);
}
});
}
}
我会喜欢它与MessageListener一起工作,它是非阻塞和异步的。
任何帮助,这是非常感谢。
我已经尝试了上面JIRA报告中提到的一些东西,它是在非守护线程中运行这个,但那不起作用。
我想这
public class SimpleConsumerThread {
public static void main(String[] args) throws Exception {
Thread t = new Thread() {
public void run() {
try {
String url = "failover:(tcp://ACTIVEMQ_HOST:61616)";
String destination = "test-topic";
TopicConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(destination);
MessageConsumer consumer = session.createConsumer(topic);
connection.start();
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
System.out.println("msg received = " + msg);
}
});
} catch (JMSException e) {
e.printStackTrace();
}
}
};
t.setDaemon(false);
t.start();
}
}