我有一个应用程序使用RabbitMQ作为消息队列来发送/接收两个组件之间的消息:发送者和接收者。发送者以非常快的方式发送消息。接收器接收到该消息,然后执行一些非常耗时的任务(主要是为非常大的数据大小编写数据库)。由于接收者需要很长时间才能完成任务,然后检索队列中的下一条消息,因此发送者将会继续快速填充队列。所以我的问题是:这会导致消息队列溢出吗?RabbitMQ:快速生产者和慢速消费者
的消息消费者看起来如下:
public void onMessage() throws IOException, InterruptedException {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
JSONObject json = new JSONObject(message);
String caseID = json.getString("caseID");
//following takes very long time
dao.saveToDB(caseID);
}
}
消费者收到的每条消息都含有caseID。对于每个caseID,它会将大量数据保存到数据库,这需要很长时间。目前只有一个消费者为RabbitMQ设置,因为生产者/消费者使用相同的队列来发布/订阅caseID。那么如何加快消费者吞吐量,让消费者赶上生产者并避免队列中的消息溢出?我应该在消费者部分使用多线程来加快消费速度吗?还是应该使用多个消费者同时使用收到的消息?或者是否有任何异步方式让消费者异步使用消息而不等待完成?欢迎任何建议。
从[RabbitMQ的文档](http://www.rabbitmq.com/tutorials/tutorial-three-python.html),这里有两种方法:工作者队列,发布/订阅。我现在正在使用pub/sub模型。我应该使用工作者队列来代替多个消费者吗? – tonga 2014-10-28 20:30:46
对于你需要的应该是工作队列。这是如何实现https://github.com/victorpictor/Hotel/blob/master/Infrastructure/MessageTransport/Receivers/Subscriber.cs#L29 – voutrin 2014-10-28 20:35:53
但是,如果我想用几个队列用于不同的目的呢?现在,caseID消息只有一个队列。除caseID外,可能还有更多的数据。所以我可能需要使用发布/订阅模式来拥有多个队列。 – tonga 2014-10-28 20:42:29