2011-07-15 42 views
3

我有一个使用ActiveMQConnectionFactory.createConnection()获取ActiveMQ连接的类。 然后它创建并拥有会话,目的地和消费者(队列)在该目的地上侦听。如何在中断JMS之后终止“ActiveMQ Transport”线程MessageConsumer.receive()

我在消费者上调用receive()或receive(millis)来让我的消息离开队列。在某些情况下,我必须杀死(中断)接收方法被调用的线程。我尝试在此之后关闭会话和连接。不幸的是,我在调用close时会不断收到异常,并且关联的“ActiveMQ Transport”线程(以及与代理的相关连接)仍处于活动状态。我得到的例外是

org.myorg.awsiface.communication.MessagingException: Failed to close JMS connection 
at 
org.myorg.aws.communication.transport.JMSMessageTransport.cleanUp(JMSMessageTransport.java:253) 
at 
org.myorg.aws.communication.protocol.ContextFinishedProtocol.cleanUp(ContextFinishedProtocol.java:94) 
at org.myorg.myservice.job.Job.run(Job.java:206) 
Caused by: javax.jms.JMSException: java.io.InterruptedIOExceptio) 
    at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:62) 
at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1227) 
at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1219) 
at org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1799) 
at org.apache.activemq.ActiveMQMessageConsumer.doClose(ActiveMQMessageConsumer.java:636) 
at org.apache.activemq.ActiveMQMessageConsumer.close(ActiveMQMessageConsumer.java:627) 
at org.myorg.aws.communication.transport.JMSMessageTransport.cleanUp(JMSMessageTransport.java:232) 
... 2 more 
Caused by: java.io.InterruptedIOException 
at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:102) 
at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40) 
at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60) 
at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1225) 
... 7 more 

我试过下面连接的URL 故障:// TCP://broker.ip.address:61616 TCP://broker.ip.address:61616 故障转移:// tcp://broker.ip.address:61616?trace = true & closeAsync = false

此外,我已经尝试使用MessageConsumer :: receiveNoWait()调用,只是执行我自己的Thread.sleep,但我总是以上述例外结束,只要我致电以下来关闭连接

try { 
    // I added the following two lines to see if the taskRunner was handling the threads - still no luck 
    TaskRunnerFactory taskRunner = ((ActiveMQConnection)connection).getSessionTaskRunner(); 
    taskRunner.shutdown(); 

    if (producer != null) { 
     producer.close(); 
    } 
    if (consumer != null) { 
     consumer.close(); 
    } 
    session.close(); 
    if (connection instanceof ActiveMQConnection) { 
     ActiveMQConnection amqConnection = (ActiveMQConnection) connection; 
     amqConnection.stop(); 
    }   
    connection.stop(); 
    connection.close(); 
} 
catch (ConnectionClosedException e) { 
    // NOOP - this is ok 
} 
catch (Exception e) { 
    throw new MessagingException("Failed to close JMS connection", e, log); 
} 

回答

0

我不知道为什么封闭不适合你。但是你可以通过使用异步消息监听器来完成你想要的。

Connection connection = connectionFactory.createConnection(); 
connection.start(); 

Session session = 
    connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 

Destination queue = session.createQueue("queueName"); 

Consumer consumer = session.createConsumer(queue); 

consumer.setMessageListener(new MessageListener() 
{ 
    public void onMessage(Message message) 
    { 
     // whatever was after the receive() method call 
    } 
}); 

然后就没有必要中断了。当你完成后,关闭的事情了:

consumer.close(); 
session.close(); 
queue.close(); 
connection.close(); 
相关问题