2014-09-11 29 views
2

我正在使用SpringBoot来启动连接到RabbitMQ队列的SpringAMQP应用程序。我希望能够从生产者发送消息,指定回复队列,以便消费者只需要发送而不必调查目的地(因此不必在消息本身中传递回复数据)。回复在SpringAMQP被预先设置?

这是我

private static final String QUEUE_NAME = "testQueue"; 
private static final String ROUTING_KEY = QUEUE_NAME; 
public static final String REPLY_QUEUE = "replyQueue"; 
private static final String USERNAME = "guest"; 
private static final String PASSWORD = "guest"; 
private static final String IP = "localhost"; 
private static final String VHOST = "/"; 
private static final int PORT = 5672; 

@Bean 
public RabbitTemplate rabbitTemplate() { 
    RabbitTemplate template = new RabbitTemplate(connectionFactory()); 
    amqpAdmin().declareQueue(new Queue(QUEUE_NAME)); 
    amqpAdmin().declareQueue(new Queue(REPLY_QUEUE)); 
    return template; 
} 

@Bean 
public AmqpAdmin amqpAdmin() { 
    return new RabbitAdmin(connectionFactory()); 
} 

@Bean 
public ConnectionFactory connectionFactory() { 
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(IP); 
    connectionFactory.setUsername(USERNAME); 
    connectionFactory.setPassword(PASSWORD); 
    connectionFactory.setVirtualHost(VHOST); 
    connectionFactory.setPort(PORT); 
    return connectionFactory; 
} 

我发送消息如下(生产者和消费者之间共享的)的配置:

public Object sendAndReply(String queue, String content){ 
     return template.convertSendAndReceive(queue, new Data(content), new MessagePostProcessor() { 

      @Override 
      public Message postProcessMessage(Message message) throws AmqpException { 
       message.getMessageProperties().setReplyTo(ReplyTester.REPLY_QUEUE); 
       return message; 
      } 
     }); 
    } 

和等待答复如下:

public void replyToQueue(String queue){ 
    template.receiveAndReply(queue, new ReceiveAndReplyCallback<Data, Data>() { 
     @Override 
     public Data handle(Data payload) { 
      System.out.println("Received: "+payload.toString()); 
      return new Data("This is a reply for: "+payload.toString()); 
     } 
    }); 
} 

然而,当发送时,我得到以下例外:

Exception in thread "main" org.springframework.amqp.UncategorizedAmqpException: java.lang.IllegalArgumentException: Send-and-receive methods can only be used if the Message does not already have a replyTo property. 
    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:66) 
    at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:112) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:841) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:820) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithTemporary(RabbitTemplate.java:705) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceive(RabbitTemplate.java:697) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:673) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:663) 
    at prodsend.Prod.sendAndReply(ReplyTester.java:137) 
    at prodsend.ReplyTester.sendMessages(ReplyTester.java:49) 
    at prodsend.ReplyTester.main(ReplyTester.java:102) 
Caused by: java.lang.IllegalArgumentException: Send-and-receive methods can only be used if the Message does not already have a replyTo property. 
    at org.springframework.util.Assert.isNull(Assert.java:89) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate$6.doInRabbit(RabbitTemplate.java:711) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate$6.doInRabbit(RabbitTemplate.java:705) 
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:835) 
    ... 8 more 

该行ReplyTest.137指向上述sendAndReply方法中的return行。


编辑: 这里是:)

class Data{ 
    public String d; 
    public Data(String s){ d = s; } 
    public String toString() { return d; } 
} 

回答

6

documentation上述数据类:

基本RPC图案。使用特定的路由键将消息发送到默认交换机,并尝试接收响应。实现通常会将回复头部设置为专用队列,并等待一段时间受限于超时。

所以该方法convertSendAndReceive处理设置replyTo报头,并返回一个Messaage - 的响应。这是一个同步模式 - RPC。

如果你想这样做异步 - 你似乎 - 不使用这种方法。使用适当的convertAndSend方法并使用适当的MessagePostProcessor添加您的replyTo标头。

由于这是异步的,您需要注册一个单独的处理程序以接收答复。这需要在之前发送消息给对方。该处理程序将在发送消息后的某个时刻被调用 - 何时未知。阅读部分3.5.2异步消费者Spring AQMP Documentation

所以,异步处理流程:

  1. 发件人寄存器上replyTo queueue
  2. 发送的处理程序发送带有replyTo消息集
  3. 客户端调用receiveAndReply,处理该消息,并且向replyTo回复
  4. 发件人回叫方法被触发

同步处理流程是:

  1. 发件人使用sendAndReceive和块
  2. 客户端调用receiveAndReply,处理该消息,并且向replyTo
  3. 发件人发送回复接收应答发送消息时,唤醒和流程它

所以后者的情况下需要发件人等待。由于您使用receiveXXX而不是注册异步处理程序,因此如果客户端需要一段时间才能调用receiveXXX,发件人可能会等待很长时间。顺便说一句,如果你想使用同步方法,但使用特定的replyTo,你可以随时拨打setReplyQueue。对于这种情况,我还提到了一个setReplyTimeout,客户要么不费心阅读消息,要么忘记回复。

+0

对于异步方法,会采取什么措施? – mangusbrother 2014-09-11 08:29:24

+0

@mangusbrother更新了答案 - 比这更复杂一点。是不是总是? – 2014-09-11 08:38:42