2015-09-01 34 views
0

我使用Hawtio来浏览我的ActiveMQ队列。我还希望能够在将它重新发送到另一个队列之前编辑JMS消息。让Hawtio在JMS消息中显示超过255个字符

我看不出我如何编辑Hawtio中的消息,但没关系,我想这在代理中直接修改消息并不合法。

相反,我虽然我会复制邮件正文并发送一条新消息,并修改了正文。现在,我面临的问题是我只能看到邮件正文的前255个字符。我如何在hawtio中看到整个ActiveMQ消息?不只是前255个字符。

回答

2

Hawtio使用浏览队列JMX接口。它调用队列上的browse()方法。它将消息返回为CompositedData[]

当转换ActiveMQBytesMessage(检查类别org.apache.activemq.broker.jmx.OpenTypeSupport.ByteMessageOpenTypeFactory)时,将添加两个字段BodyLengthBodyPreview。这些字段返回以下数据。

  • BodyLength - JMS消息体的长度
  • BodyPreview - 第一个255个字节的JMS消息体(其是硬编码的长度,如Claus Ibsen在他的回答;-)已经说过)

入住课org.apache.activemq.broker.jmx.OpenTypeSupport.ByteMessageOpenTypeFactory的方法Map<String, Object> getFields(Object o)

Hawtio使用字段BodyPreview来显示非文本消息的消息。

入住Hawtio文件hawtio-web/src/main/webapp/app/activemq/js/browse.ts

function createBodyText(message) { 
    if (message.Text) { 
     ... 
    } else if (message.BodyPreview) { 
     ... 
     if (code === 1 || code === 2) { 
      // bytes and text 
      var len = message.BodyPreview.length; 
      var lenTxt = "" + textArr.length; 
      body = "bytes:\n" + bytesData + "\n\ntext:\n" + textData; 
      message.textMode = "bytes (" + len + " bytes) and text (" + lenTxt + " chars)"; 
     } else { 
      // bytes only 
      var len = message.BodyPreview.length; 
      body = bytesData; 
      message.textMode = "bytes (" + len + " bytes)"; 
     } 
     ... 
    } else { 
     message.textMode = "unsupported"; 
     ... 

如果你想改变它,你要么必须改变它在ActiveMQHawtio

一个冗长而详细的示例来演示解释。

import static java.lang.System.out; 
import java.lang.management.ManagementFactory; 
import java.util.Enumeration; 
import java.util.concurrent.TimeUnit; 
import javax.jms.BytesMessage; 
import javax.jms.Connection; 
import javax.jms.MessageProducer; 
import javax.jms.Queue; 
import javax.jms.QueueBrowser; 
import javax.jms.Session; 
import javax.jms.TextMessage; 
import javax.management.MBeanServer; 
import javax.management.MBeanServerInvocationHandler; 
import javax.management.ObjectName; 
import javax.management.openmbean.CompositeData; 
import javax.management.openmbean.CompositeType; 
import org.apache.activemq.ActiveMQConnectionFactory; 
import org.apache.activemq.broker.BrokerFactory; 
import org.apache.activemq.broker.BrokerService; 
import org.apache.activemq.broker.jmx.QueueViewMBean; 
import org.apache.activemq.command.ActiveMQBytesMessage; 
import org.apache.activemq.command.ActiveMQTextMessage; 

public class BodyPreviewExample { 

    public static void main(String[] args) throws Exception { 
     String password = "password"; 
     String user = "user"; 
     String queueName = "TEST_QUEUE"; 
     String brokerUrl = "tcp://localhost:61616"; 

     BrokerService broker = BrokerFactory.createBroker("broker:"+brokerUrl); 
     broker.start(); 
     broker.waitUntilStarted(); 

     Connection conn = new ActiveMQConnectionFactory(brokerUrl) 
       .createConnection(user, password); 
     conn.start(); 

     Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     Queue producerQueue = session.createQueue(queueName); 
     MessageProducer producer = session.createProducer(producerQueue); 

     // create a dummy message 
     StringBuilder sb = new StringBuilder(1000); 
     for (int i = 0; i < 100; i++) { 
      sb.append(">CAFEBABE<"); 
     } 

     // create and send a JMSBytesMessage 
     BytesMessage bytesMsg = session.createBytesMessage(); 
     bytesMsg.writeBytes(sb.toString().getBytes()); 
     producer.send(bytesMsg); 

     // create and send a JMSTextMessage 
     TextMessage textMsg = session.createTextMessage(); 
     textMsg.setText(sb.toString()); 
     producer.send(textMsg); 

     producer.close(); 

     out.printf("%nmessage info via session browser%n"); 
     String format = "%-20s = %s%n"; 
     Queue consumerQueue = session.createQueue(queueName); 
     QueueBrowser browser = session.createBrowser(consumerQueue); 
     for (Enumeration p = browser.getEnumeration(); p.hasMoreElements();) { 
      out.println(); 
      Object next = p.nextElement(); 
      if (next instanceof ActiveMQBytesMessage) { 
       ActiveMQBytesMessage amq = (ActiveMQBytesMessage) next; 
       out.printf(format, "JMSMessageID", amq.getJMSMessageID()); 
       out.printf(format, "JMSDestination", amq.getJMSDestination()); 
       out.printf(format, "JMSXMimeType", amq.getJMSXMimeType()); 
       out.printf(format, "BodyLength", amq.getBodyLength()); 
      } else if (next instanceof ActiveMQTextMessage) { 
       ActiveMQTextMessage amq = (ActiveMQTextMessage) next; 
       out.printf(format, "JMSMessageID", amq.getJMSMessageID()); 
       out.printf(format, "JMSDestination", amq.getJMSDestination()); 
       out.printf(format, "JMSXMimeType", amq.getJMSXMimeType()); 
       out.printf(format, "text.length", amq.getText().length()); 
      } else { 
       out.printf("unhandled message type: %s%n", next.getClass()); 
      } 
     } 
     session.close(); 
     conn.close(); 

     // access the queue via JMX 
     out.printf("%nmessage info via JMX browse operation%n"); 
     MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer(); 
     ObjectName name = new ObjectName("org.apache.activemq:type=Broker" 
       + ",brokerName=localhost" 
       + ",destinationType=Queue" 
       + ",destinationName=" + queueName); 
     QueueViewMBean queue 
       = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, 
         name, QueueViewMBean.class, true); 
     CompositeData[] browse = queue.browse(); 
     for (CompositeData compositeData : browse) { 
      out.println(); 
      CompositeType compositeType = compositeData.getCompositeType(); 
      out.printf(format, "CompositeType", compositeType.getTypeName()); 
      out.printf(format,"JMSMessageID",compositeData.get("JMSMessageID")); 
      if (compositeData.containsKey("BodyLength")) { 
       // body length of the ActiveMQBytesMessage 
       Long bodyLength = (Long) compositeData.get("BodyLength"); 
       out.printf(format, "BodyLength", bodyLength); 
       // the content displayed by hawtio 
       Byte[] bodyPreview = (Byte[]) compositeData.get("BodyPreview"); 
       out.printf(format, "size of BodyPreview", bodyPreview.length); 
      } else if (compositeData.containsKey("Text")) { 
       String text = (String) compositeData.get("Text"); 
       out.printf(format, "Text.length()", text.length()); 
      } 
     } 
     // uncomment if you want to check with e.g. JConsole 
     // TimeUnit.MINUTES.sleep(5); 
     broker.stop(); 
    } 
} 

例如输出

message info via session browser 

JMSMessageID   = ID:hostname-50075-1467979678722-3:1:1:1:1 
JMSDestination  = queue://TEST_QUEUE 
JMSXMimeType   = jms/bytes-message 
BodyLength   = 1000 

JMSMessageID   = ID:hostname-50075-1467979678722-3:1:1:1:2 
JMSDestination  = queue://TEST_QUEUE 
JMSXMimeType   = jms/text-message 
text.length   = 1000 

message info via JMX browse operation 

CompositeType  = org.apache.activemq.command.ActiveMQBytesMessage 
JMSMessageID   = ID:hostname-50075-1467979678722-3:1:1:1:1 
BodyLength   = 1000 
size of BodyPreview = 255 

CompositeType  = org.apache.activemq.command.ActiveMQTextMessage 
JMSMessageID   = ID:hostname-50075-1467979678722-3:1:1:1:2 
Text.length()  = 1000 
+0

如何在Java代码中实现它? – Solo

+0

@Solo你是什么意思“你如何在Java代码中实现它?”?这个例子在Java中。 – SubOptimal

1

我认为在ActiveMQ的硬编码的限制,当你查询和浏览使用JMX API是什么hawtio使用的队列。但不记得它是否只有255个字节或更高。

查找范围hawtio设置,有可能一个ActiveMQ的插件设置来改变255个字符,不记得要么;)

+0

我不能hawtio找到任何设置,将输出限制为255我想提供我的用户从一个ActiveMQ的队列手动复制整个消息的能力。你使用什么工具? – pmartin8

相关问题