我有一个用Java编写的单线程ActiveMQ使用者。我试图做的就是从队列中接收()一个消息,尝试将它发送给一个Web服务,如果它成功确认()它。如果Web服务调用失败,我希望消息停留在队列中,并在超时后重新发送。无法让ActiveMQ重新发送邮件
除了重新发送部分,它或多或少都在工作:每次重新启动我的使用者时,它会为每个仍在队列中的每个消息获取一条消息,但发送失败后,消息永远不会发送。
我的代码如下所示:
public boolean init() throws JMSException, FileNotFoundException, IOException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setInitialRedeliveryDelay(500);
policy.setBackOffMultiplier(2);
policy.setUseExponentialBackOff(true);
connectionFactory.setRedeliveryPolicy(policy);
connectionFactory.setUseRetroactiveConsumer(true); // ????
Connection connection = connectionFactory.createConnection();
connection.setExceptionListener(this);
connection.start();
session = connection.createSession(transacted, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
destination = session.createQueue(subject); //???
consumer = session.createConsumer(destination);
//consumer.setMessageListener(this); // message listener had same behaviour
}
private void process() {
while(true) {
System.out.println("Waiting...");
try {
Message message = consumer.receive();
onMessage(message);
} catch (JMSException e) {
e.printStackTrace();
}
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Override
public void onMessage(Message message) {
System.out.println("onMessage");
messagesReceived++;
if (message instanceof TextMessage) {
try {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
if(!client.sendMessage(msg)) {
System.out.println("Webservice call failed. Keeping message");
//message.
} else {
message.acknowledge();
}
if (transacted) {
if ((messagesReceived % batch) == 0) {
System.out.println("Commiting transaction for last " + batch + " messages; messages so far = " + messagesReceived);
session.commit();
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
我不是目前使用的交易(也许我应该是什么?)。
我敢肯定我错过了一些简单的事情,很快就会拍我的前额,但我似乎无法弄清楚这应该如何工作。谢谢!
编辑:不能回答这个自己不足够的代表:
OK,经过一番更多的试验,它证明交易是做到这一点的唯一方法。下面是新的代码:
public boolean init() throws JMSException, FileNotFoundException, IOException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setInitialRedeliveryDelay(1000L);
policy.setMaximumRedeliveries(RedeliveryPolicy.NO_MAXIMUM_REDELIVERIES);
connectionFactory.setRedeliveryPolicy(policy);
connectionFactory.setUseRetroactiveConsumer(true);
Connection connection = connectionFactory.createConnection();
connection.setExceptionListener(this);
connection.start();
session = connection.createSession(transacted, ActiveMQSession.CLIENT_ACKNOWLEDGE);
destination = session.createQueue(subject);
consumer = session.createConsumer(destination);
}
@Override
public void onMessage(Message message) {
System.out.println("onMessage");
messagesReceived++;
if (message instanceof TextMessage) {
try {
TextMessage txtMsg = (TextMessage) message;
String msg = txtMsg.getText();
if(client.sendMessage(msg)) {
if(transacted) {
System.out.println("Call succeeded - committing message");
session.commit();
}
//message.acknowledge();
} else {
if(transacted) {
System.out.println("Webservice call failed. Rolling back message");
session.rollback();
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
现在,消息被重新发送如重新传递政策规定每1000毫秒。
希望这可以帮助别人! :)
确实非常有用。谢谢! – Amer 2012-02-27 22:07:05
哇,救了我。显然,activemq通常会在10秒左右的时间内将事情“重新加入队列”,除非您像指定的那样指定1。 – rogerdpack 2012-03-26 18:01:56
这对初学者很有帮助。特别是为什么你提到'1000ms'。 – Hanumath 2013-09-11 06:50:31