2011-12-20 37 views
6

我有一个用Java编写的单线程ActiveMQ使用者。我试图做的就是从队列中接收()一个消息,尝试将它发送给一个Web服务,如果它成功确认()它。如果Web服务调用失败,我希望消息停留在队列中,并在超时后重新发送。无法让ActiveMQ重新发送邮件

除了重新发送部分,它或多或少都在工作:每次重新启动我的使用者时,它会为每个仍在队列中的每个消息获取一条消息,但发送失败后,消息永远不会发送。

我的代码如下所示:

public boolean init() throws JMSException, FileNotFoundException, IOException { 
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); 
    RedeliveryPolicy policy = new RedeliveryPolicy(); 
    policy.setInitialRedeliveryDelay(500); 
    policy.setBackOffMultiplier(2); 
    policy.setUseExponentialBackOff(true); 

    connectionFactory.setRedeliveryPolicy(policy); 
    connectionFactory.setUseRetroactiveConsumer(true); // ???? 
    Connection connection = connectionFactory.createConnection(); 

    connection.setExceptionListener(this); 
    connection.start(); 

    session = connection.createSession(transacted, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); 
    destination = session.createQueue(subject); //??? 

    consumer = session.createConsumer(destination); 
    //consumer.setMessageListener(this); // message listener had same behaviour 

} 

private void process() { 
    while(true) { 
     System.out.println("Waiting..."); 
     try { 
      Message message = consumer.receive(); 
      onMessage(message); 
     } catch (JMSException e) { 
      e.printStackTrace(); 
     } 
     try { 
      Thread.sleep(500); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 
} 

@Override 
public void onMessage(Message message) { 
    System.out.println("onMessage"); 
    messagesReceived++; 

    if (message instanceof TextMessage) { 
     try { 
      TextMessage txtMsg = (TextMessage) message; 
      String msg = txtMsg.getText(); 

      if(!client.sendMessage(msg)) { 
       System.out.println("Webservice call failed. Keeping message"); 
       //message. 
      } else { 
       message.acknowledge(); 
      } 

      if (transacted) { 
       if ((messagesReceived % batch) == 0) { 
        System.out.println("Commiting transaction for last " + batch + " messages; messages so far = " + messagesReceived); 
        session.commit(); 
       } 
      } 
     } catch (JMSException e) { 
      e.printStackTrace(); 
     } 
    } 
} 

我不是目前使用的交易(也许我应该是什么?)。

我敢肯定我错过了一些简单的事情,很快就会拍我的前额,但我似乎无法弄清楚这应该如何工作。谢谢!


编辑:不能回答这个自己不足够的代表:

OK,经过一番更多的试验,它证明交易是做到这一点的唯一方法。下面是新的代码:

public boolean init() throws JMSException, FileNotFoundException, IOException { 
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); 
    RedeliveryPolicy policy = new RedeliveryPolicy(); 
    policy.setInitialRedeliveryDelay(1000L); 
    policy.setMaximumRedeliveries(RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES); 

    connectionFactory.setRedeliveryPolicy(policy); 
    connectionFactory.setUseRetroactiveConsumer(true); 
    Connection connection = connectionFactory.createConnection(); 

    connection.setExceptionListener(this); 
    connection.start(); 

    session = connection.createSession(transacted, ActiveMQSession.CLIENT_ACKNOWLEDGE); 
    destination = session.createQueue(subject); 

    consumer = session.createConsumer(destination); 
} 

@Override 
public void onMessage(Message message) { 
    System.out.println("onMessage"); 
    messagesReceived++; 

    if (message instanceof TextMessage) { 
     try { 
      TextMessage txtMsg = (TextMessage) message; 
      String msg = txtMsg.getText(); 

      if(client.sendMessage(msg)) { 
       if(transacted) { 
        System.out.println("Call succeeded - committing message"); 
        session.commit(); 
       } 
       //message.acknowledge(); 
      } else { 
       if(transacted) { 
        System.out.println("Webservice call failed. Rolling back message"); 
        session.rollback(); 
       } 
      } 

     } catch (JMSException e) { 
      e.printStackTrace(); 
     } 
    } 
} 

现在,消息被重新发送如重新传递政策规定每1000毫秒。

希望这可以帮助别人! :)

+0

确实非常有用。谢谢! – Amer 2012-02-27 22:07:05

+0

哇,救了我。显然,activemq通常会在10秒左右的时间内将事情“重新加入队列”,除非您像指定的那样指定1。 – rogerdpack 2012-03-26 18:01:56

+0

这对初学者很有帮助。特别是为什么你提到'1000ms'。 – Hanumath 2013-09-11 06:50:31

回答

3

您不必使用事务,CLIENT_ACK/Session.recover时,()也能发挥作用......任何时候发生以下

消息被重新传递到客户端:

  • 使用事务处理会话并调用rollback()。
  • 在调用提交之前关闭事务会话。
  • 会话正在使用CLIENT_ACKNOWLEDGE,并调用Session.recover()。

看到http://activemq.apache.org/message-redelivery-and-dlq-handling.html

+0

我尝试了第三点,你以下面的方式建议。 'Session session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);'。在我想重发消息到'Activemq'的业务代码之后,我只能再次处理消息(哪个进程失败)。为此我写这样写。 'Session.recover时();'。但是在评估这个程序后,Activemq并没有向用户发送消息。有关我试图检查此[链接]的更多信息(http://stackoverflow.com/questions/18712758/how-can-i-process-unacknowledged-messages-in-activemq) – Hanumath 2013-09-10 10:52:46