背景: 我有一个标准的生产者消费者队列,消费者缓慢,而生产者速度很快。期望是每当生产者完成所请求的消息时,它就认可该消息,并且生产者将假定与消息相关的任务完成。由于生产者速度很快,我不希望生产线等待,相反,只要消息被确认,就应该调用回调。由于JMS在这方面受到限制,并且我尽可能直接使用了像ActiveMQMessageProducer
这样的ActiveMQ类。ActiveMQ实现异步确认JAVA 8
问题: 消息正在自动确认,即使Consumer尚未启动,注册的异步回调也会被调用。 public void send(Destination destination, Message message, AsyncCallback onComplete)
生产者
public static boolean setup() {
Producer.connectionFactory = new
ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
// Create a Connection
Producer.connection =
(ActiveMQConnection)connectionFactory.createConnection();
connection.setAlwaysSessionAsync(true);
connection.start();
}
public Producer() {
session = (ActiveMQSession)connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
destination = (ActiveMQDestination)session.createQueue("TEST.FOO");
producer = (ActiveMQMessageProducer)session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
...
public void run() {
long id = messageID.getAndIncrement();
String text = "Hello world!"
Message message = session.createTextMessage(text);
producer.send(message, new MessageCompletion(id, this.messageRundown));
}
消费者
public static boolean setup() {
Consumer.connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
Consumer.connection = (ActiveMQConnection)connectionFactory.createConnection();
connection.setAlwaysSessionAsync(true);
return true;
}
public Consumer() {
session = (ActiveMQSession)connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
destination = (ActiveMQDestination)session.createQueue("TEST.FOO");
consumer = (ActiveMQMessageConsumer)session.createConsumer(destination);
consumer.setMessageListener(this);
connection.start();
}
// implements MessageListener
@Override
public void onMessage(Message message) {
messageQueue.add(message);
}
public void run() {
while(true) {
Message message = messageQueue.poll();
while(message != null) {
// do some work
message.acknowledge();
message = messageQueue.poll();
}
Thread.sleep(10000);
}
}
虽然不需要消费者我将它作为参考,东西已被删除,以确保简洁,这是一部分工作代码。
谢谢!我现在明白了这一点,异步回调实现的JMSException处理程序现在也加起来了。因此,对于高性能,持久的消息队列,保持异步以确保我们不会在这方面花费太多时间仍然是有意义的。 – amritanshu
对于持久性消息,您无需权衡是否想知道每条消息实际上何时会保持持久,如果有任何性能提升,就会获得很小的消息。 –