2014-03-25 129 views
4

我期望能够在兔子队列之间移动消息(手动)。在队列之间移动消息rabbitMQ

例如:

first-queue has messages ['a','b','c','d','e','f'] 
second-queue has messages ['x','y'] 

我希望能够移动,例如从消息第一队列“a”到第二队列。这可以是手动操作。这两个队列都在同一个经纪人身上,我不想通过任何交换来发送他们。无论如何要做到这一点?我一直在玩rabbitmqctl,但似乎无法使它工作。我愿意接受任何其他工具,这些工具可以让我做到这一点。最终,我希望有某种消息选择器(例如,将所有消息的头部字段= X从第一个队列移动到第二个队列)。

我对rabbitmq和amqp仍然陌生,但一直无法找到如何做到这一点(如果它甚至可能)的文档。

谢谢。

回答

4

它没有记录的事实是因为它远离消息传递模型。

可以很容易地将消息发送到特定的队列 - 见tutorial #1例如 - 但读消息的唯一办法就是消耗他们,在券商向客户端发送的顺序。

不允许选择来自队列的消息,就像您可以使用SQL一样。

你可以做的是让客户(或最终,一个插件,但是这是一个高级主题)消耗从队列消息,并基于某些规则,你重新发布他们随后队列或另一个。

6

@Dax - 我只是在这里回答了这个同样的问题:Is it possible to move/merge messages between RabbitMQ queues?

我还有很长的描述存在。为了避免重复的内容,我不想复制/粘贴。

这听起来像你正在寻找的是rabbitmq铲子插件。

它内置的核心,只要启用此功能:

rabbitmq-plugins enable rabbitmq_shovel 
rabbitmq-plugins enable rabbitmq_shovel_management 

从管理部分在GUI中,你会发现一个简单的界面来创建铲。

查看其他帖子对我来说deets!

2

下面是一个简单的Java代码一切地从一个队列到另一个:

public void moveMessages(
      final String sourceQueueName, 
      final String targetQueueName, 
      final String rabbitmqHost, 
      final String rabbitmqUsername, 
      final String rabbitmqPassword, 
      final String rabbitmqVirtualHost 
) throws IOException { 

     // Initialize the consuming and publishing channel 
     ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost(rabbitmqHost); 
     factory.setUsername(rabbitmqUsername); 
     factory.setPassword(rabbitmqPassword); 
     factory.setVirtualHost(rabbitmqVirtualHost); 
     Connection connection = factory.newConnection(); 

     Channel consumingChannel = connection.createChannel(); 
     Channel publishingChannel = connection.createChannel(); 


     while (true) { 
      // Get the first message in the queue (auto ack = false) 
      GetResponse response = consumingChannel.basicGet(sourceQueueName, false); 

      if (response == null) { 
       return; 
      } 

      BasicProperties properties = response.getProps(); 

      // Publish the message to the origin queue 
      publishingChannel.txSelect(); 
      publishingChannel.basicPublish("", targetQueueName, (AMQP.BasicProperties) properties, response.getBody()); 
      publishingChannel.txCommit(); 

      // Acknowledge the message in the dead letter queue 
      consumingChannel.basicAck(response.getEnvelope().getDeliveryTag(), false); 
     } 
    } 
相关问题