2017-04-08 90 views
0

有什么方法可以获取jms队列中待处理消息的统计数量。我的目标是在队列中没有剩余消息进行处理时关闭连接。我怎么能做到这一点。如何获取jms队列中待处理消息的数量

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); 
    Connection connection = connectionFactory.createConnection("admin", "admin"); 
    connection.start(); 

    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 

    Destination destination = session.createQueue(subject); 

    MessageConsumer consumer = session.createConsumer(destination); 

    while (true) { 
     Message message = consumer.receive(); 

     if (message instanceof TextMessage) { 
      TextMessage textMessage = (TextMessage) message; 
      System.out.println("Incoming Message:: '" + textMessage.getText() + "'"); 
     } 
    } 

回答

-1

我已经通过使用下面的createBrowser方法做到了这一点,这是我更新的代码。

public static void main(String[] args) throws JMSException { 
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); 
Connection connection = connectionFactory.createConnection("admin", "admin"); 
connection.start(); 

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 

Destination destination = session.createQueue(subject); 
int queueSize = QueueConsumer.getQueueSize(session, (Queue) destination); 
System.out.println("QUEUE SIZE: " + queueSize); 
MessageConsumer consumer = session.createConsumer(destination); 

for (int i = 0; i < queueSize; i++) { 
    Message message = consumer.receive(); 

    if (message instanceof TextMessage) { 
     TextMessage textMessage = (TextMessage) message; 
     System.out.println("Incomming Message: '" + textMessage.getText() + "'"); 
    } 
} 
connection.close(); 
} 

private int getQueueSize(Session session, Queue queue) { 
    int count = 0; 
    try { 
     QueueBrowser browser = session.createBrowser(queue); 
     Enumeration elems = browser.getEnumeration(); 
     while (elems.hasMoreElements()) { 
      elems.nextElement(); 
      count++; 
     } 
    } catch (JMSException ex) { 
     ex.printStackTrace(); 
    } 
    return count; 
} 
1

的唯一可靠的方式来获得真正的队列计数形成的经纪人是使用JMX MBean的队列,并调用getQueueSize方法。

其他编程方法是使用Statistics Broker Plugin,它要求您能够更改代理配置来安装它。安装后,您可以向控制队列发送特殊消息,并获得有关要监控的目标的详细信息的响应。

使用QueueBrowser并没有给你一个真正的计数,因为浏览器将有多少邮件将被分页到内存中发送给你,因此如果你的队列比限制更深,你将无法获得最大限制实际大小,只是最大页面大小限制的值。

+0

感谢您的回答。我最近开始在Jboss导火索上工作,所以我没有想到它。 –

0

只是打破循环并关闭连接,如果你的JMS消息为空..

while (true) { 
    Message message = consumer.receive(2000); 
    if (message == null){ 
     break; 
    } 
    if (message instanceof TextMessage) { 
     TextMessage textMessage = (TextMessage) message; 
     System.out.println("Incoming Message:: '" + textMessage.getText() + "'"); 
    } 
    } 
    connection.close(); 
+0

这没有奏效,因为默认情况下,consumer.receive()是阻塞的,这意味着它将等待消息到达队列中。 –

+0

那么你可以使用超时与接收方法.. – Nir

0

我已经使用JMX做到了这一点,它的工作感谢名单@Tim BISH

这里是我更新的代码

JMXServiceURL url = new JMXServiceURL("service:jmx:rmi://0.0.0.0:44444/jndi/rmi://0.0.0.0:1099/karaf-root"); 

HashMap<String, String[]> environment = new HashMap<String, String[]>(); 
String[] creds = { "admin", "admin" }; 
environment.put(JMXConnector.CREDENTIALS, creds); 

JMXConnector jmxc = JMXConnectorFactory.connect(url, environment); 
MBeanServerConnection connection = jmxc.getMBeanServerConnection(); 

ObjectName nameConsumers = new ObjectName("org.apache.activemq:type=Broker,brokerName=amq,destinationType=Queue,destinationName=myqueue"); 
DestinationViewMBean mbView = MBeanServerInvocationHandler.newProxyInstance(connection, nameConsumers, DestinationViewMBean.class, true); 
long queueSize = mbView.getQueueSize(); 
System.out.println(queueSize);