2013-06-12 29 views
9

RabbitMQ的新手和Java新手。如何在使用Spring ChannelAwareMessageListener时处理RabbitMQ消费者取消通知

我试图写,将使用手动ACK和处理使用java春AMQP抽象消费者取消通知的监听器。我可以通过使用Spring抽象来完成这两项任务吗?

我想写一个侦听器,它将从队列中提取消息并处理该消息(可能写入数据库或其他东西)。我计划使用手动确认,这样如果处理消息失败或由于某种原因无法完成,我可以拒绝并重新发送。到目前为止,我想我已经发现,为了使用Spring AMQP手动确认/确认/拒绝,我必须使用ChannelAwareMessageListener

我意识到我应该从RabbitMQ处理消费者取消通知,但是使用ChannelAwareMessageListener我真的没有看到为此编写代码的方法。我认为处理CCN的唯一方法是使用较低级别的java客户端API编写代码,方法是呼叫channel.basicConsume()并传递一个新的DefaultConsumer实例,该实例允许您处理消息传递和取消。

我也看不到我如何在ConnectionFactory上设置clientProperties(告诉代理我可以处理CCN),因为我从配置中的bean获取工厂。

我的侦听器的伪代码和容器的创建如下。

public class MyChannelAwareListener implements ChannelAwareMessageListener 
{ 
    @Override 
    public void onMessage(Message message, Channel channel) throws Exception 
    { 
     msgProcessed = processMessage(message); 

     if(msgProcessed)  
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); 
     else 
      channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); 
    } 
} 

public static void main(String[] args) throws Exception 
{ 
    ConnectionFactory rabbitConnectionFactory; 
    ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext (MY_CONTEXT_PATH); 
    rabbitConnectionFactory = (ConnectionFactory)ctx.getBean("rabbitConnectionFactory"); 

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 

    MyChannelAwareListener listener = new MyChannelAwareListener(); 
    container.setMessageListener(listener); 
    container.setQueueNames("myQueue"); 
    container.setConnectionFactory(rabbitConnectionFactory); 
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL); 
    container.start(); 
} 

回答

1

为了设置您需要使用setClientProperties方法在ConnectionFactory客户端属性(假设该连接工厂是从RabbitMQ的Java库的对象)。此方法期望Map<String, Object>包含客户端的属性和功能。下面的线是RabbitMQ的Java库中的默认值:

Map<String,Object> props = new HashMap<String, Object>(); 
props.put("product", LongStringHelper.asLongString("RabbitMQ")); 
props.put("version", LongStringHelper.asLongString(ClientVersion.VERSION)); 
props.put("platform", LongStringHelper.asLongString("Java")); 
props.put("copyright", LongStringHelper.asLongString(Copyright.COPYRIGHT)); 
props.put("information", LongStringHelper.asLongString(Copyright.LICENSE)); 

Map<String, Object> capabilities = new HashMap<String, Object>(); 
capabilities.put("publisher_confirms", true); 
capabilities.put("exchange_exchange_bindings", true); 
capabilities.put("basic.nack", true); 
capabilities.put("consumer_cancel_notify", true); 

props.put("capabilities", capabilities); 

为了管理ACK和消费者取消我不知道如何与Spring AMQP抽象做到这一点,但它是完全可行与channel.basicConsume它给你有可能通过所有的回调方法处理所有的场景:

http://www.rabbitmq.com/releases/rabbitmq-java-client/v3.1.5/rabbitmq-java-client-javadoc-3.1.5/

希望这有助于!