RabbitMQ的新手和Java新手。如何在使用Spring ChannelAwareMessageListener时处理RabbitMQ消费者取消通知
我试图写,将使用手动ACK和处理使用java春AMQP抽象消费者取消通知的监听器。我可以通过使用Spring抽象来完成这两项任务吗?
我想写一个侦听器,它将从队列中提取消息并处理该消息(可能写入数据库或其他东西)。我计划使用手动确认,这样如果处理消息失败或由于某种原因无法完成,我可以拒绝并重新发送。到目前为止,我想我已经发现,为了使用Spring AMQP手动确认/确认/拒绝,我必须使用ChannelAwareMessageListener
。
我意识到我应该从RabbitMQ处理消费者取消通知,但是使用ChannelAwareMessageListener
我真的没有看到为此编写代码的方法。我认为处理CCN的唯一方法是使用较低级别的java客户端API编写代码,方法是呼叫channel.basicConsume()
并传递一个新的DefaultConsumer
实例,该实例允许您处理消息传递和取消。
我也看不到我如何在ConnectionFactory
上设置clientProperties
(告诉代理我可以处理CCN),因为我从配置中的bean获取工厂。
我的侦听器的伪代码和容器的创建如下。
public class MyChannelAwareListener implements ChannelAwareMessageListener
{
@Override
public void onMessage(Message message, Channel channel) throws Exception
{
msgProcessed = processMessage(message);
if(msgProcessed)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
else
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
public static void main(String[] args) throws Exception
{
ConnectionFactory rabbitConnectionFactory;
ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext (MY_CONTEXT_PATH);
rabbitConnectionFactory = (ConnectionFactory)ctx.getBean("rabbitConnectionFactory");
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
MyChannelAwareListener listener = new MyChannelAwareListener();
container.setMessageListener(listener);
container.setQueueNames("myQueue");
container.setConnectionFactory(rabbitConnectionFactory);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.start();
}