2016-02-05 85 views
2

我很难找到Spring AMQP/Rabbit MQ中计划/延迟消息的方式。
经过多次搜索后,我仍然无法在Spring AMQP中做到这一点。有人可以告诉我如何在Spring AMQP中使用x-delay
我想延迟消息,如果消费者一方发生某种异常。 RabbitMQ的说要添加X-延迟和安装的我已经有了这样的插件,但还是消息不延时Spring AMQP中的计划/延迟消息传递RabbitMq



我得到这消息
收到<(身体立刻正在添加:“[B @ 60a4ae5f (字节[26])'MessageProperties [头= {X-延迟= 15000}

@Bean 
ConnectionFactory connectionFactory(){ 

    CachingConnectionFactory connectionFactory=new CachingConnectionFactory("127.0.0.1"); 
    connectionFactory.setUsername("guest"); 
    connectionFactory.setPassword("guest"); 
    connectionFactory.setPort(1500); 
    connectionFactory.setPublisherReturns(true); 
    return connectionFactory; 

} 

@Bean 
Binding binding(@Qualifier("queue")Queue queue, DirectExchange exchange) { 
    return new Binding(queue.getName(), Binding.DestinationType.QUEUE, exchange.getName(), queue.getName(), null); 
    //return BindingBuilder.bind(queue).to(exchange).with(queueName); 
} 

@Bean 
DirectExchange exchange() { 
    DirectExchange exchange=new DirectExchange("delay-exchange"); 
    return exchange; 
} 

消费者---
@Override

public void onMessage(Message message, Channel channel) throws Exception { 

    System.out.println("Received <" + message+ ">" +rabbitTemplate); 

    if(i==1){ 
     AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder(); 
     Map<String,Object> headers = message.getMessageProperties().getHeaders(); 
     headers.put("x-delay", 15000); 
     props.headers(headers); 
     i++; 
     channel.basicPublish(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), 
       props.build(), message.getBody()); 
    } 
    } 
+1

不确定是谁推荐你用'jms'标记标记这个问题,但这是不正确的。这只是关于RabbitMQ。是的,'spring-amqp'。正在修复... –

回答

4

首先,您好像没有与Scheduling Messages with RabbitMQ文章如下:

要使用延迟的消息交换,你只需要如下声明提供“X-延迟的消息”交换式交换:

Map<String, Object> args = new HashMap<String, Object>(); 
args.put("x-delayed-type", "direct"); 
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args); 

我会说同样可以使用Spring AMQP来实现:

@Bean 
CustomExchange delayExchange() { 
    Map<String, Object> args = new HashMap<String, Object>(); 
    args.put("x-delayed-type", "direct"); 
    return new CustomExchange("my-exchange", "x-delayed-message", true, false, args); 
} 

另一个值得关注哟你真的应该发布消息到delay-exchange,而不是任何其他。再说一遍:无论如何,该文档都提到了这一点。

UPDATE

由于春季AMQP 1.6延迟的消息得到支撑外的现成功能: https://spring.io/blog/2016/02/16/spring-amqp-1-6-0-milestone-1-and-1-5-4-available

+0

是的,我没有在Exchange中获得这种x-delayed-message类型,因为它不在Spring AMQP中。我经历了很多次,但无法弄清楚它是一种Exchange类型。无论如何,我现在能够做到。 –

+0

感谢张贴在Spring AMQP以及... –

+0

@Artem我应该在哪里寻找spring-rabbit-1.6.xsd –