2013-08-22 149 views
1

我第一次使用MQ并尝试使用RabbitMQ实现日志记录系统。我的实施涉及'发件人'在消费者正在收听消息之前,消费者没有收到来自MQ的消息

/* 
* This class sends messages over MQ 
*/ 
public class MQSender { 
    private final static String EXCHANGE_NAME = "mm_exchange"; 
    private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"}; 

    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { 
     /* 
     * Boilerplate stuff 
     */ 
     ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost("localhost"); 
     Connection connection = factory.newConnection(); 
     Channel channel = connection.createChannel(); 

     //declare the exchange that messages pass through, type=direct 
     channel.exchangeDeclare(EXCHANGE_NAME, "direct"); 

     String[] levels = {"green", "orange", "red", "black"}; 
     for (String log_level : levels) { 
      String message = "This is a " + log_level + " message"; 
      System.out.println("Sending " + log_level + " message"); 
      //publish the message with each of the bindings in levels 
      channel.basicPublish(EXCHANGE_NAME, log_level, null, message.getBytes()); 
     } 

     channel.close(); 
     connection.close(); 
    } 
} 

它向我的每个颜色发送一条消息到交换机,其中颜色将用作绑定。它涉及一个“接收器”

public class MQReceiver { 
    private final static String EXCHANGE_NAME = "mm_exchange"; 
    private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"}; 

    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { 
     receiveMessagesFromQueue(2); 
    } 

    public static void receiveMessagesFromQueue(int maxLevel) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { 
     /* 
     * Boilerplate stuff 
     */ 
     ConnectionFactory factory = new ConnectionFactory(); 
     factory.setHost("localhost"); 
     Connection connection = factory.newConnection(); 
     Channel channel = connection.createChannel(); 

     //declare the exchange that messages pass through, type=direct 
     channel.exchangeDeclare(EXCHANGE_NAME, "direct"); 

     //generate random queue 
     String queueName = channel.queueDeclare().getQueue(); 

     //set bindings from 0 to maxLevel for the queue 
     for (int level = 0; level <= maxLevel; level++) { 
      channel.queueBind(queueName, EXCHANGE_NAME, LOG_LEVELS[level]); 
     } 

     QueueingConsumer consumer = new QueueingConsumer(channel); 
     channel.basicConsume(queueName, true, consumer); 

     while(true) { 
      //waits until a message is delivered then gets that message 
      QueueingConsumer.Delivery delivery = consumer.nextDelivery(); 
      String message = new String(delivery.getBody()); 
      String routingKey = delivery.getEnvelope().getRoutingKey(); 

      System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); 
     } 
    } 
} 

这是考虑作为一个参数代表我想哪个颜色绑定它从交换供给一个数字。

在我的实现和一般RabbitMQ中,似乎邮件存储在交换机中,直到Consumer请求它们,此时它们被分配到它们各自的队列,然后一次一个地发送到客户端(或MQ语言中的消费者)。我的问题是,当我在运行MQReceiver类之前运行MQSender类时,消息永远不会传递。但是,当我首先运行MQReceiver类时,会收到消息。从我对MQ的理解中,我会认为这些消息应该存储在服务器上,直到类运行,然后这些消息应该传递给它们的消费者,但这不是发生的事情。我的主要问题是这些消息是否可以存储在交换机中,如果不存在,它们应该存储在何处,以便在消费者(即我的MQReceiver类)被调用后交付它们?

感谢您的帮助!

+0

只是一个猜测,但我怀疑你的'Sender'被丢弃由于缺乏注册'消费者' – StormeHawke

+0

的消息你可能有autoAck设置为tru è?更多信息在这里:http://www.rabbitmq.com/tutorials/tutorial-two-java.html –

+0

这也许? http://stackoverflow.com/questions/6386117/rabbitmq-use-of-immediate-and-mandatory-bits –

回答

1

如果RabbitMQ的路由密钥与绑定到交换机的任何队列不匹配,则丢弃消息。当您首先启动MQSender时,没有队列被绑定,因此它发送的消息将丢失。当你开始MQReceiver时,它将队列绑定到交换机,所以RabbitMQ有一个地方可以放置来自MQSender的消息。当您停止MQReceiver时,由于您创建了匿名队列,因此将从交换机中删除队列和所有绑定。

如果您希望邮件存储在服务器上,而MQReceiver未运行,则需要接收方创建命名队列,并将路由键绑定到该队列。请注意,创建一个已命名的队列是幂等的,如果该队列已经存在,则不会创建该队列。然后你需要接收者从指定的队列中取消消息。

更改您的代码看起来是这样的:

MQSender

.... 
String namedQueue = "logqueue"; 
//declare named queue and bind log level routing keys to it. 
//RabbitMQ will put messages with matching routing keys in this queue 
channel.queueDeclare(namedQueue, false, false, false, null); 
for (int level = 0; level < LOG_LEVELS.length; level++) { 
    channel.queueBind(namedQueue, EXCHANGE_NAME, LOG_LEVELS[level]); 
} 
... 

MQReceiver

... 
channel.exchangeDeclare(EXCHANGE_NAME, "direct"); 

QueueingConsumer consumer = new QueueingConsumer(channel); 

//Consume messages off named queue instead of anonymous queue 
String namedQueue = "logqueue"; 
channel.basicConsume(namedQueue, true, consumer); 

while(true) { 
... 
相关问题