我正在使用SpringBoot来启动连接到RabbitMQ队列的SpringAMQP应用程序。我希望能够从生产者发送消息,指定回复队列,以便消费者只需要发送而不必调查目的地(因此不必在消息本身中传递回复数据)。回复在SpringAMQP被预先设置?
这是我
private static final String QUEUE_NAME = "testQueue";
private static final String ROUTING_KEY = QUEUE_NAME;
public static final String REPLY_QUEUE = "replyQueue";
private static final String USERNAME = "guest";
private static final String PASSWORD = "guest";
private static final String IP = "localhost";
private static final String VHOST = "/";
private static final int PORT = 5672;
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
amqpAdmin().declareQueue(new Queue(QUEUE_NAME));
amqpAdmin().declareQueue(new Queue(REPLY_QUEUE));
return template;
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(IP);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setVirtualHost(VHOST);
connectionFactory.setPort(PORT);
return connectionFactory;
}
我发送消息如下(生产者和消费者之间共享的)的配置:
public Object sendAndReply(String queue, String content){
return template.convertSendAndReceive(queue, new Data(content), new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setReplyTo(ReplyTester.REPLY_QUEUE);
return message;
}
});
}
和等待答复如下:
public void replyToQueue(String queue){
template.receiveAndReply(queue, new ReceiveAndReplyCallback<Data, Data>() {
@Override
public Data handle(Data payload) {
System.out.println("Received: "+payload.toString());
return new Data("This is a reply for: "+payload.toString());
}
});
}
然而,当发送时,我得到以下例外:
Exception in thread "main" org.springframework.amqp.UncategorizedAmqpException: java.lang.IllegalArgumentException: Send-and-receive methods can only be used if the Message does not already have a replyTo property.
at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:66)
at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:112)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:841)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:820)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithTemporary(RabbitTemplate.java:705)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceive(RabbitTemplate.java:697)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:673)
at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:663)
at prodsend.Prod.sendAndReply(ReplyTester.java:137)
at prodsend.ReplyTester.sendMessages(ReplyTester.java:49)
at prodsend.ReplyTester.main(ReplyTester.java:102)
Caused by: java.lang.IllegalArgumentException: Send-and-receive methods can only be used if the Message does not already have a replyTo property.
at org.springframework.util.Assert.isNull(Assert.java:89)
at org.springframework.amqp.rabbit.core.RabbitTemplate$6.doInRabbit(RabbitTemplate.java:711)
at org.springframework.amqp.rabbit.core.RabbitTemplate$6.doInRabbit(RabbitTemplate.java:705)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:835)
... 8 more
该行ReplyTest.137指向上述sendAndReply
方法中的return
行。
编辑: 这里是:)
class Data{
public String d;
public Data(String s){ d = s; }
public String toString() { return d; }
}
对于异步方法,会采取什么措施? – mangusbrother 2014-09-11 08:29:24
@mangusbrother更新了答案 - 比这更复杂一点。是不是总是? – 2014-09-11 08:38:42