我使用Hawtio来浏览我的ActiveMQ队列。我还希望能够在将它重新发送到另一个队列之前编辑JMS消息。让Hawtio在JMS消息中显示超过255个字符
我看不出我如何编辑Hawtio中的消息,但没关系,我想这在代理中直接修改消息并不合法。
相反,我虽然我会复制邮件正文并发送一条新消息,并修改了正文。现在,我面临的问题是我只能看到邮件正文的前255个字符。我如何在hawtio中看到整个ActiveMQ消息?不只是前255个字符。
我使用Hawtio来浏览我的ActiveMQ队列。我还希望能够在将它重新发送到另一个队列之前编辑JMS消息。让Hawtio在JMS消息中显示超过255个字符
我看不出我如何编辑Hawtio中的消息,但没关系,我想这在代理中直接修改消息并不合法。
相反,我虽然我会复制邮件正文并发送一条新消息,并修改了正文。现在,我面临的问题是我只能看到邮件正文的前255个字符。我如何在hawtio中看到整个ActiveMQ消息?不只是前255个字符。
Hawtio使用浏览队列JMX接口。它调用队列上的browse()
方法。它将消息返回为CompositedData[]
。
当转换ActiveMQBytesMessage
(检查类别org.apache.activemq.broker.jmx.OpenTypeSupport.ByteMessageOpenTypeFactory
)时,将添加两个字段BodyLength
和BodyPreview
。这些字段返回以下数据。
BodyLength
- JMS消息体的长度BodyPreview
- 第一个255个字节的JMS消息体(其是硬编码的长度,如Claus Ibsen在他的回答;-)已经说过)入住课org.apache.activemq.broker.jmx.OpenTypeSupport.ByteMessageOpenTypeFactory
的方法Map<String, Object> getFields(Object o)
。
Hawtio使用字段BodyPreview
来显示非文本消息的消息。
入住Hawtio文件hawtio-web/src/main/webapp/app/activemq/js/browse.ts
function createBodyText(message) {
if (message.Text) {
...
} else if (message.BodyPreview) {
...
if (code === 1 || code === 2) {
// bytes and text
var len = message.BodyPreview.length;
var lenTxt = "" + textArr.length;
body = "bytes:\n" + bytesData + "\n\ntext:\n" + textData;
message.textMode = "bytes (" + len + " bytes) and text (" + lenTxt + " chars)";
} else {
// bytes only
var len = message.BodyPreview.length;
body = bytesData;
message.textMode = "bytes (" + len + " bytes)";
}
...
} else {
message.textMode = "unsupported";
...
如果你想改变它,你要么必须改变它在ActiveMQ或Hawtio。
一个冗长而详细的示例来演示解释。
import static java.lang.System.out;
import java.lang.management.ManagementFactory;
import java.util.Enumeration;
import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.MBeanServer;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeType;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
public class BodyPreviewExample {
public static void main(String[] args) throws Exception {
String password = "password";
String user = "user";
String queueName = "TEST_QUEUE";
String brokerUrl = "tcp://localhost:61616";
BrokerService broker = BrokerFactory.createBroker("broker:"+brokerUrl);
broker.start();
broker.waitUntilStarted();
Connection conn = new ActiveMQConnectionFactory(brokerUrl)
.createConnection(user, password);
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue producerQueue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(producerQueue);
// create a dummy message
StringBuilder sb = new StringBuilder(1000);
for (int i = 0; i < 100; i++) {
sb.append(">CAFEBABE<");
}
// create and send a JMSBytesMessage
BytesMessage bytesMsg = session.createBytesMessage();
bytesMsg.writeBytes(sb.toString().getBytes());
producer.send(bytesMsg);
// create and send a JMSTextMessage
TextMessage textMsg = session.createTextMessage();
textMsg.setText(sb.toString());
producer.send(textMsg);
producer.close();
out.printf("%nmessage info via session browser%n");
String format = "%-20s = %s%n";
Queue consumerQueue = session.createQueue(queueName);
QueueBrowser browser = session.createBrowser(consumerQueue);
for (Enumeration p = browser.getEnumeration(); p.hasMoreElements();) {
out.println();
Object next = p.nextElement();
if (next instanceof ActiveMQBytesMessage) {
ActiveMQBytesMessage amq = (ActiveMQBytesMessage) next;
out.printf(format, "JMSMessageID", amq.getJMSMessageID());
out.printf(format, "JMSDestination", amq.getJMSDestination());
out.printf(format, "JMSXMimeType", amq.getJMSXMimeType());
out.printf(format, "BodyLength", amq.getBodyLength());
} else if (next instanceof ActiveMQTextMessage) {
ActiveMQTextMessage amq = (ActiveMQTextMessage) next;
out.printf(format, "JMSMessageID", amq.getJMSMessageID());
out.printf(format, "JMSDestination", amq.getJMSDestination());
out.printf(format, "JMSXMimeType", amq.getJMSXMimeType());
out.printf(format, "text.length", amq.getText().length());
} else {
out.printf("unhandled message type: %s%n", next.getClass());
}
}
session.close();
conn.close();
// access the queue via JMX
out.printf("%nmessage info via JMX browse operation%n");
MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
ObjectName name = new ObjectName("org.apache.activemq:type=Broker"
+ ",brokerName=localhost"
+ ",destinationType=Queue"
+ ",destinationName=" + queueName);
QueueViewMBean queue
= MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
name, QueueViewMBean.class, true);
CompositeData[] browse = queue.browse();
for (CompositeData compositeData : browse) {
out.println();
CompositeType compositeType = compositeData.getCompositeType();
out.printf(format, "CompositeType", compositeType.getTypeName());
out.printf(format,"JMSMessageID",compositeData.get("JMSMessageID"));
if (compositeData.containsKey("BodyLength")) {
// body length of the ActiveMQBytesMessage
Long bodyLength = (Long) compositeData.get("BodyLength");
out.printf(format, "BodyLength", bodyLength);
// the content displayed by hawtio
Byte[] bodyPreview = (Byte[]) compositeData.get("BodyPreview");
out.printf(format, "size of BodyPreview", bodyPreview.length);
} else if (compositeData.containsKey("Text")) {
String text = (String) compositeData.get("Text");
out.printf(format, "Text.length()", text.length());
}
}
// uncomment if you want to check with e.g. JConsole
// TimeUnit.MINUTES.sleep(5);
broker.stop();
}
}
例如输出
message info via session browser
JMSMessageID = ID:hostname-50075-1467979678722-3:1:1:1:1
JMSDestination = queue://TEST_QUEUE
JMSXMimeType = jms/bytes-message
BodyLength = 1000
JMSMessageID = ID:hostname-50075-1467979678722-3:1:1:1:2
JMSDestination = queue://TEST_QUEUE
JMSXMimeType = jms/text-message
text.length = 1000
message info via JMX browse operation
CompositeType = org.apache.activemq.command.ActiveMQBytesMessage
JMSMessageID = ID:hostname-50075-1467979678722-3:1:1:1:1
BodyLength = 1000
size of BodyPreview = 255
CompositeType = org.apache.activemq.command.ActiveMQTextMessage
JMSMessageID = ID:hostname-50075-1467979678722-3:1:1:1:2
Text.length() = 1000
我认为在ActiveMQ的硬编码的限制,当你查询和浏览使用JMX API是什么hawtio使用的队列。但不记得它是否只有255个字节或更高。
查找范围hawtio设置,有可能一个ActiveMQ的插件设置来改变255个字符,不记得要么;)
我不能hawtio找到任何设置,将输出限制为255我想提供我的用户从一个ActiveMQ的队列手动复制整个消息的能力。你使用什么工具? – pmartin8
如何在Java代码中实现它? – Solo
@Solo你是什么意思“你如何在Java代码中实现它?”?这个例子在Java中。 – SubOptimal