我已经创建了一个具有20个内核的指定线程池的连接。只有一个线程在执行器服务中同时运行,RabbitMQ
ConnectionFactory factory = new ConnectionFactory();
....
//specified es
ExecutorService consumerExecutor = Executors.newFixedThreadPool(threadNum, threadFactory);
con = factory.newConnection(consumerExecutor, addresses);
然后从该连接建立频道:
final Channel channel = connection.createChannel();
并使用它来创建DefaultConsumer。
虽然我发现尽管线程可以用来消费消息,但总是只有一个线程正在消费消息,即使消息在服务器中大量累积。
我看看源代码,并发现:
private final class WorkPoolRunnable implements Runnable {
@Override
public void run() {
int size = MAX_RUNNABLE_BLOCK_SIZE;
List<Runnable> block = new ArrayList<Runnable>(size);
try {
Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size);
if (key == null) return; // nothing ready to run
try {
for (Runnable runnable : block) {
runnable.run();
}
} finally {
if (ConsumerWorkService.this.workPool.finishWorkBlock(key)) {
ConsumerWorkService.this.executor.execute(new WorkPoolRunnable());
}
}
} catch (RuntimeException e) {
Thread.currentThread().interrupt();
}
}
}
/* Basic work selector and state transition step */
private K readyToInProgress() {
K key = this.ready.poll();
if (key != null) {
this.inProgress.add(key);
}
return key;
}
/**
* Return the next <i>ready</i> client,
* and transfer a collection of that client's items to process.
* Mark client <i>in progress</i>.
* If there is no <i>ready</i> client, return <code><b>null</b></code>.
* @param to collection object in which to transfer items
* @param size max number of items to transfer
* @return key of client to whom items belong, or <code><b>null</b></code> if there is none.
*/
public K nextWorkBlock(Collection<W> to, int size) {
synchronized (this) {
K nextKey = readyToInProgress();
if (nextKey != null) {
VariableLinkedBlockingQueue<W> queue = this.pool.get(nextKey);
drainTo(queue, to, size);
}
return nextKey;
}
}
诀窍应该是ConsumerWorkService.this.workPool.nextWorkBlock
,它轮询从就绪队列的通道,并运行回调后添加到终点块读队列run()
。如果我错了,请纠正我。
这是令人困惑的,因为消费者绑定到一个通道,并且直到最后一个任务块完成时才将通道释放到队列中,这意味着线程池始终只为该消费者提供一个线程。
问题:
- 为什么RabbitMQ的设计,此模型
- 我们如何优化这一问题
- 是好到任务提交给一个独立的线程池
handleDelivery
消费消息以及ACK (确保消息ACK任务完成后,才)