2017-05-23 39 views
0

背景: 我有一个标准的生产者消费者队列,消费者缓慢,而生产者速度很快。期望是每当生产者完成所请求的消息时,它就认可该消息,并且生产者将假定与消息相关的任务完成。由于生产者速度很快,我不希望生产线等待,相反,只要消息被确认,就应该调用回调。由于JMS在这方面受到限制,并且我尽可能直接使用了像ActiveMQMessageProducer这样的ActiveMQ类。ActiveMQ实现异步确认JAVA 8

问题: 消息正在自动确认,即使Consumer尚未启动,注册的异步回调也会被调用。 public void send(Destination destination, Message message, AsyncCallback onComplete)

生产者

public static boolean setup() {  
     Producer.connectionFactory = new 
     ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); 
     // Create a Connection 
     Producer.connection = 
      (ActiveMQConnection)connectionFactory.createConnection(); 
     connection.setAlwaysSessionAsync(true); 
     connection.start();   
    } 

public Producer() { 
     session = (ActiveMQSession)connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 
     destination = (ActiveMQDestination)session.createQueue("TEST.FOO"); 
     producer = (ActiveMQMessageProducer)session.createProducer(destination); 
     producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 
     } 
... 

public void run() { 
     long id = messageID.getAndIncrement();   
     String text = "Hello world!" 
     Message message = session.createTextMessage(text); 
     producer.send(message, new MessageCompletion(id, this.messageRundown)); 
    } 

消费者

public static boolean setup() {  
    Consumer.connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");  
    Consumer.connection = (ActiveMQConnection)connectionFactory.createConnection(); 
    connection.setAlwaysSessionAsync(true);   
    return true; 
} 

public Consumer() { 
    session = (ActiveMQSession)connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); 
    destination = (ActiveMQDestination)session.createQueue("TEST.FOO"); 
    consumer = (ActiveMQMessageConsumer)session.createConsumer(destination); 
    consumer.setMessageListener(this); 
    connection.start(); 
} 

// implements MessageListener 
@Override 
public void onMessage(Message message) {  
    messageQueue.add(message); 
} 
public void run() { 
    while(true) { 
     Message message = messageQueue.poll(); 
     while(message != null) { 
      // do some work    
      message.acknowledge(); 
      message = messageQueue.poll();    
     } 
     Thread.sleep(10000);    
    } 
} 

虽然不需要消费者我将它作为参考,东西已被删除,以确保简洁,这是一部分工作代码。

回答

1

您对承认方式的理解是错误的。发件人的异步回调仅告诉您代理已收到该消息。如果它是一个持久发送,回调将表明该消息也被写入磁盘。

在JMS或大多数其他消息传递代理中,生产者和消费者没有耦合。生产者在队列中放置消息,然后消费者可以随时前来并从该队列中消费。两者之间没有耦合,生产者在继续产生下一条消息之前不能等待消费者。

如果你想知道何时处理特定的消息,所以你可以扼杀工作,那么你想看看JMS Request/Response风格的消息模式。

+0

谢谢!我现在明白了这一点,异步回调实现的JMSException处理程序现在也加起来了。因此,对于高性能,持久的消息队列,保持异步以确保我们不会在这方面花费太多时间仍然是有意义的。 – amritanshu

+0

对于持久性消息,您无需权衡是否想知道每条消息实际上何时会保持持久,如果有任何性能提升,就会获得很小的消息。 –