2017-08-01 43 views
1

我已经创建了一个具有20个内核的指定线程池的连接。只有一个线程在执行器服务中同时运行,RabbitMQ

 ConnectionFactory factory = new ConnectionFactory(); 
     .... 
     //specified es 
     ExecutorService consumerExecutor = Executors.newFixedThreadPool(threadNum, threadFactory); 
     con = factory.newConnection(consumerExecutor, addresses); 

然后从该连接建立频道:

 final Channel channel = connection.createChannel(); 

并使用它来创建DefaultConsumer。

虽然我发现尽管线程可以用来消费消息,但总是只有一个线程正在消费消息,即使消息在服务器中大量累积。

我看看源代码,并发现:

private final class WorkPoolRunnable implements Runnable { 

    @Override 
    public void run() { 
     int size = MAX_RUNNABLE_BLOCK_SIZE; 
     List<Runnable> block = new ArrayList<Runnable>(size); 
     try { 
      Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size); 
      if (key == null) return; // nothing ready to run 
      try { 
       for (Runnable runnable : block) { 
        runnable.run(); 
       } 
      } finally { 
       if (ConsumerWorkService.this.workPool.finishWorkBlock(key)) { 
        ConsumerWorkService.this.executor.execute(new WorkPoolRunnable()); 
       } 
      } 
     } catch (RuntimeException e) { 
      Thread.currentThread().interrupt(); 
     } 
    } 
} 


/* Basic work selector and state transition step */ 
private K readyToInProgress() { 
    K key = this.ready.poll(); 
    if (key != null) { 
     this.inProgress.add(key); 
    } 
    return key; 
} 


/** 
* Return the next <i>ready</i> client, 
* and transfer a collection of that client's items to process. 
* Mark client <i>in progress</i>. 
* If there is no <i>ready</i> client, return <code><b>null</b></code>. 
* @param to collection object in which to transfer items 
* @param size max number of items to transfer 
* @return key of client to whom items belong, or <code><b>null</b></code> if there is none. 
*/ 
public K nextWorkBlock(Collection<W> to, int size) { 
    synchronized (this) { 
     K nextKey = readyToInProgress(); 
     if (nextKey != null) { 
      VariableLinkedBlockingQueue<W> queue = this.pool.get(nextKey); 
      drainTo(queue, to, size); 
     } 
     return nextKey; 
    } 
} 

诀窍应该是ConsumerWorkService.this.workPool.nextWorkBlock,它轮询从就绪队列的通道,并运行回调后添加到终点块读队列run()。如果我错了,请纠正我。

这是令人困惑的,因为消费者绑定到一个通道,并且直到最后一个任务块完成时才将通道释放到队列中,这意味着线程池始终只为该消费者提供一个线程。

问题:

  1. 为什么RabbitMQ的设计,此模型
  2. 我们如何优化这一问题
  3. 是好到任务提交给一个独立的线程池handleDelivery消费消息以及ACK (确保消息ACK任务完成后,才)

回答

0

> 1.为什么RabbitMQ的设计这个模型

我想知道我自己的原因。但是这个事实清楚地体现在它们的documentation

每个通道都有它自己的调度线程。对于每个频道一个消费者最常见的使用案例 ,这意味着消费者不会阻止其他消费者购买其他 消费者。如果每个频道有多个消费者,请注意长时间运行的消费者可能会阻止向该频道上的其他 消费者发送回叫。

> 2.我们怎样优化这个问题

您可以有多个频道或处理提交实际工作到另一个线程池脱钩消息消费。你可以在this article找到更多的细节。

> 3。这是好到任务提交到handleDelivery一个独立的线程池消耗消息以及ACK(确保消息ACK任务完成后,才)

报价从docs

当手动确认是使用,重要的是考虑 什么线程确认。如果是从接收交付 线程不同(例如消费者#handleDelivery 委托交付办案,以不同的线程),与 承认设置为true多个参数是不安全的,并会导致 双重确认,因此通道级别协议 关闭通道的异常。确认一条消息在 时间可能是安全的。

相关问题