我第一次使用MQ并尝试使用RabbitMQ实现日志记录系统。我的实施涉及'发件人'在消费者正在收听消息之前,消费者没有收到来自MQ的消息
/*
* This class sends messages over MQ
*/
public class MQSender {
private final static String EXCHANGE_NAME = "mm_exchange";
private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"};
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
/*
* Boilerplate stuff
*/
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//declare the exchange that messages pass through, type=direct
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String[] levels = {"green", "orange", "red", "black"};
for (String log_level : levels) {
String message = "This is a " + log_level + " message";
System.out.println("Sending " + log_level + " message");
//publish the message with each of the bindings in levels
channel.basicPublish(EXCHANGE_NAME, log_level, null, message.getBytes());
}
channel.close();
connection.close();
}
}
它向我的每个颜色发送一条消息到交换机,其中颜色将用作绑定。它涉及一个“接收器”
public class MQReceiver {
private final static String EXCHANGE_NAME = "mm_exchange";
private final static String[] LOG_LEVELS = {"green", "orange", "red", "black"};
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
receiveMessagesFromQueue(2);
}
public static void receiveMessagesFromQueue(int maxLevel) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
/*
* Boilerplate stuff
*/
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//declare the exchange that messages pass through, type=direct
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//generate random queue
String queueName = channel.queueDeclare().getQueue();
//set bindings from 0 to maxLevel for the queue
for (int level = 0; level <= maxLevel; level++) {
channel.queueBind(queueName, EXCHANGE_NAME, LOG_LEVELS[level]);
}
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while(true) {
//waits until a message is delivered then gets that message
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
}
}
}
这是考虑作为一个参数代表我想哪个颜色绑定它从交换供给一个数字。
在我的实现和一般RabbitMQ中,似乎邮件存储在交换机中,直到Consumer
请求它们,此时它们被分配到它们各自的队列,然后一次一个地发送到客户端(或MQ语言中的消费者)。我的问题是,当我在运行MQReceiver
类之前运行MQSender
类时,消息永远不会传递。但是,当我首先运行MQReceiver
类时,会收到消息。从我对MQ的理解中,我会认为这些消息应该存储在服务器上,直到类运行,然后这些消息应该传递给它们的消费者,但这不是发生的事情。我的主要问题是这些消息是否可以存储在交换机中,如果不存在,它们应该存储在何处,以便在消费者(即我的MQReceiver
类)被调用后交付它们?
感谢您的帮助!
只是一个猜测,但我怀疑你的'Sender'被丢弃由于缺乏注册'消费者' – StormeHawke
的消息你可能有autoAck设置为tru è?更多信息在这里:http://www.rabbitmq.com/tutorials/tutorial-two-java.html –
这也许? http://stackoverflow.com/questions/6386117/rabbitmq-use-of-immediate-and-mandatory-bits –