2016-12-21 39 views
0

它就像“休斯顿我们有一个问题在这里”,我需要在第一次尝试处理事件失败后5分钟内安排/延迟消息。 我在这种情况下实现了死信交换。Spring AMQP - 使用TTL的死信机制消息重排队

上失败,路线DLX中的消息 - >重试队列和的5分钟的另一种尝试一个TTL后回来工作队列。

这里是我使用的配置:

public class RabbitMQConfig { 
    @Bean(name = "work") 
    @Primary 
    Queue workQueue() { 
     return new Queue(WORK_QUEUE, true, false, false, null); 
    } 

    @Bean(name = "workExchange") 
    @Primary 
    TopicExchange workExchange() { 
     return new TopicExchange(WORK_EXCHANGE, true, false); 
    } 

    @Bean 
    Binding workBinding(Queue queue, TopicExchange exchange) { 
     return BindingBuilder.bind(workQueue()).to(workExchange()).with("#"); 
    } 

    @Bean(name = "retryExchange") 
    FanoutExchange retryExchange() { 
     return new FanoutExchange(RETRY_EXCHANGE, true, false); 
    } 

    @Bean(name = "retry") 
    Queue retryQueue() { 
     Map<String, Object> args = new HashMap<String, Object>(); 
     args.put("x-dead-letter-exchange", WORK_EXCHANGE); 
     args.put("x-message-ttl", RETRY_DELAY); //delay of 5 min 
     return new Queue(RETRY_QUEUE, true, false, false, args); 
    } 

    @Bean 
    Binding retryBinding(Queue queue,FanoutExchange exchange) { 
     return BindingBuilder.bind(retryQueue()).to(retryExchange()); 
    } 

    @Bean 
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { 
     SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 
     factory.setConnectionFactory(connectionFactory); 
     return factory; 
    } 

    @Bean 
    Consumer receiver() { 
     return new Consumer(); 
    } 

    @Bean 
    MessageListenerAdapter listenerAdapter(Consumer receiver) { 
     return new MessageListenerAdapter(receiver, "receiveMessage"); 
    } 
} 

Producer.java:

@GetMapping(path = "/hello") 
public String sayHello() { 
    // Producer operation 

     String messages[]; 
     messages = new String[] {" hello "}; 

    for (int i = 0; i < 5; i++) { 
     String message = util.getMessage(messages)+i; 

     rabbitTemplate.convertAndSend("WorkExchange","", message); 
     System.out.println(" Sent '" + message + "'"); 
    } 
    return "hello"; 
} 

Consumer.java:

public class Consumer { 

    @RabbitListener(queues = "WorkQueue") 
    public void receiveMessage(String message, Channel channel, 
      @Header(AmqpHeaders.DELIVERY_TAG) Long tag) throws IOException, InterruptedException { 

     try { 

      System.out.println("message to be processed: " + message); 
      doWorkTwo(message); 
      channel.basicAck(tag, false); 

     } catch (Exception e) { 
      System.out.println("In the exception catch block"); 
      System.out.println("message in dead letter exchange: " + message); 
      channel.basicPublish("RetryExchange", "", null, message.getBytes()); 

     } 

    } 

    private void doWorkTwo(String task) throws InterruptedException { 

     int c = 0; 
     int b = 5; 
     int d = b/c; 

    } 

} 

它是用一个死的正确方法重试队列等待一次5分钟,在后信换取我的情况和第二次尝试它不等待在重试队列中5分钟(我所提到TTL为5分钟),并移动到该工作队列立即

我击中本地主机上运行该应用程序:8080 /你好网址。

这是我更新的配置。

RabbitMQConfig.java:

@EnableRabbit 
public class RabbitMQConfig { 

    final static String WORK_QUEUE = "WorkQueue"; 
    final static String RETRY_QUEUE = "RetryQueue"; 
    final static String WORK_EXCHANGE = "WorkExchange"; // Dead Letter Exchange 
    final static String RETRY_EXCHANGE = "RetryExchange"; 
    final static int RETRY_DELAY = 60000; // in ms (1 min) 

    @Bean(name = "work") 
    @Primary 
    Queue workQueue() { 
     Map<String, Object> args = new HashMap<String, Object>(); 
     args.put("x-dead-letter-exchange", RETRY_EXCHANGE); 
     return new Queue(WORK_QUEUE, true, false, false, args); 
    } 

    @Bean(name = "workExchange") 
    @Primary 
    DirectExchange workExchange() { 
     return new DirectExchange(WORK_EXCHANGE, true, false); 
    } 

    @Bean 
    Binding workBinding(Queue queue, DirectExchange exchange) { 
     return BindingBuilder.bind(workQueue()).to(workExchange()).with(""); 
    } 

    @Bean(name = "retryExchange") 
    DirectExchange retryExchange() { 
     return new DirectExchange(RETRY_EXCHANGE, true, false); 
    } 

    // Messages will drop off RetryQueue into WorkExchange for re-processing 
    // All messages in queue will expire at same rate 
    @Bean(name = "retry") 
    Queue retryQueue() { 
     Map<String, Object> args = new HashMap<String, Object>(); 
     //args.put("x-dead-letter-exchange", WORK_EXCHANGE); 
     //args.put("x-message-ttl", RETRY_DELAY); 
     return new Queue(RETRY_QUEUE, true, false, false, null); 
    } 

    @Bean 
    Binding retryBinding(Queue queue, DirectExchange exchange) { 
     return BindingBuilder.bind(retryQueue()).to(retryExchange()).with(""); 
    } 

    @Bean 
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { 
     SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 
     factory.setConnectionFactory(connectionFactory); 
     factory.setDefaultRequeueRejected(false); 
     /*factory.setAdviceChain(new Advice[] { 
       org.springframework.amqp.rabbit.config.RetryInterceptorBuilder 
         .stateless() 
         .maxAttempts(2).recoverer(new RejectAndDontRequeueRecoverer()) 
         .backOffOptions(1000, 2, 5000) 
         .build() 
     });*/ 
     return factory; 
    } 

    @Bean 
    Consumer receiver() { 
     return new Consumer(); 
    } 

    @Bean 
    MessageListenerAdapter listenerAdapter(Consumer receiver) { 
     return new MessageListenerAdapter(receiver, "receiveMessage"); 
    } 
} 

Consumer.java:

public class Consumer { 

    @RabbitListener(queues = "WorkQueue") 
    public void receiveMessage(String message, Channel channel, 
      @Header(AmqpHeaders.DELIVERY_TAG) Long tag, 
      @Header(required = false, name = "x-death") HashMap<String, String> xDeath) 
      throws IOException, InterruptedException { 

     doWorkTwo(message); 
     channel.basicAck(tag, false); 
    } 

    private void doWorkTwo(String task) { 
     int c = 0; 
     int b = 5; 
     if (c < b) { 
      throw new AmqpRejectAndDontRequeueException(task); 
     } 
    } 
} 
+0

什么你的建议没有任何意义 - 因为你要发布到重试队列自己,经纪人并不知道,这是第二重试 - 它只是从它的角度来看,新的消息 - 所以不会采取任何不同的动作。 –

+0

谢谢@GaryRussell。我怎样才能完成经纪人可以承认第二次重试等情况。我已经尝试了几件事,但始终将计数设置为1. 如何自动发布retryQueue? – Diva04

+0

如果您拒绝并直接要求,经纪人只有一个迹象表明它被重新递交,而不是一个计数;如果你拒绝DLQ,那么经纪人会通过'x-death'头部继续计数 - 请参阅我的答案。如果您自己将其发布到DLQ,则需要通过自己的标题保持计数。 –

回答

0

如果拒绝消息,以便经纪人路由到一个DLQ,您可以检查x-death头。在这种情况下,我有一个TTL为5秒的DLQ,并且来自主队列的消息的使用者拒绝它;经纪人路由到DLQ,那么它过期并被路由回到主队列 - 在x-death头显示重路由操作的数量:

x-death header

+0

如何拒绝去DLQ的消息? 使用channel.reject将简单地从workQueue中删除消息本身。我需要在捕获块中以不同的方式处理它吗?或者是我的配置文件有什么缺失? – Diva04

+0

您必须使用“x-dead-letter-exchange”参数配置队列,并使用适当的路由密钥(如果交换类型需要一个)将您的DLQ绑定到该交换机上(或者使用扇出交换并将队列绑定到该交换机上)。请参阅[这里](https://www.rabbitmq.com/dlx.html)。 –

+0

@GaryRusselI,我添加factory.setDefaultRequeueRejected(假);到SimpleRabbitListenerContainerFactory Bean,失败的消息仍然不会来到DLQ(retryQueue)。我错过了什么来拒绝邮件,以便直接发送到DLQ? – Diva04

相关问题