2016-12-14 25 views
2

立足我的建筑离这个例子:通道在RabbitMQ的空转与工作队列情景

RabbitMQ - Work Queues

设置:

  • 工人同时
  • 得到一个消息,每个劳工下载量文档,需要几秒钟
  • 工人成功下载文档后,它会确认邮件
  • 如果工人不能下载一个文件,它为noAcks重新排队(三级重试的最大值)

我期待到我的实现的瓶颈是造成减速的消息。因为我使用noAck来重新排队失败的工人。要启用此均匀地分布在我的工作线程我已经设置预取1.Looking在这个问题:RabbitMQ work queue is blocking consumers - 他们看到我所看到的在下面的截图:

acks/second

channels

要使确保工人一次只能分配一条消息,我需要将预取设置为1,但也有人说这会导致工人顺序工作而不是并行工作。

跑步在渠道层面究竟意味着什么?我看到队列和连接正常运行,但单个通道(每个线程一个)空闲。

编辑#1:关于将连接池传递给RabbitMQ连接的说明看起来很有希望。 https://www.rabbitmq.com/api-guide.html#consumer-thread-pool我使用Spring AMQP,但我认为类似的方法可以用在这里:

 /** 
    * Configure a large thread pool for concurrent channels on the physical Connection 
    */ 
    @Bean 
    public org.springframework.amqp.rabbit.connection.CachingConnectionFactory rabbitConnectionFactory() { 
     logger.info("Configuring connection factory"); 
     CachingConnectionFactory cf = new CachingConnectionFactory(); 
     cf.setAddresses(this.rabbitMQProperties.getAddresses()); 
     cf.setUsername(this.rabbitMQProperties.getUsername()); 
     cf.setPassword(this.rabbitMQProperties.getPassword()); 
     cf.setVirtualHost(this.rabbitMQProperties.getVirtualHost()); 
     //configure a large thread pool for the connection thread 
     int threads = 30; 
     logger.info(String.format("Configuring thread pool with %d threads", threads)); 
     ExecutorService connectionPool = Executors.newFixedThreadPool(threads); 
     cf.setExecutor(connectionPool); 
     logger.info(String.format("MQ cache mode: %s", cf.getCacheMode().toString())); 
     logger.info(String.format("MQ connection cache: %d", cf.getConnectionCacheSize())); 
     logger.info(String.format("MQ channel cache: %d", cf.getChannelCacheSize())); 
     return cf; 
    } 

    @Bean 
    AmqpTemplate rabbitTemplate(org.springframework.amqp.rabbit.connection.CachingConnectionFactory connectionFactory){ 
     AmqpTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); 
     return rabbitTemplate; 
    } 

回答

1

Apparantly春季AMQP单个物理TCP/IP连接的默认线程池默认为5个线程:

Spring AMQP

此外,在写入时,所述的RabbitMQ-客户端库创建默认每个连接(5个螺纹)固定线程池。在使用大量连接时,应考虑在CachingConnectionFactory上设置自定义执行程序。然后,所有连接将使用相同的执行程序,并且它的线程可以共享。执行程序的线程池应该是无界的,或者为预期的利用率设置适当的值(通常每个连接至少有一个线程)。如果在每个连接上创建多个通道,则池大小将影响并发性,因此变量(或简单缓存)的线程池执行程序将是最合适的。

我能够通过改变分配给的RabbitMQ的连接池的线程数来复制此:

/** 
    * Expand the number of concurrent threads for a single RabbitMQ connection 
    * http://docs.spring.io/spring-amqp/reference/htmlsingle/ 
    * Also, at the time of writing, the rabbitmq-client library creates a fixed thread pool for each connection (5 threads) by default. 
    * When using a large number of connections, you should consider setting a custom executor on the CachingConnectionFactory. 
    */ 
    @Bean(name="channelPool") 
    @Scope("singleton") 
    MigrationPool rabbitConnectionPool(){ 
     int channels = 50; 
     logger.info(String.format("Configuring connection pool with %d threads", channels)); 
     return new MigrationPool(channels, channels, 0L, TimeUnit.MILLISECONDS, 
       new LinkedBlockingQueue<Runnable>()); 
    } 

    /** 
    * Configure a large thread pool for concurrent channels on the physical Connection 
    */ 
    @Bean 
    public org.springframework.amqp.rabbit.connection.CachingConnectionFactory rabbitConnectionFactory(@Qualifier("channelPool") MigrationPool connectionPool) { 
     logger.info("Configuring connection factory"); 
     CachingConnectionFactory cf = new CachingConnectionFactory(); 
     cf.setAddresses(this.rabbitMQProperties.getAddresses()); 
     cf.setUsername(this.rabbitMQProperties.getUsername()); 
     cf.setPassword(this.rabbitMQProperties.getPassword()); 
     cf.setVirtualHost(this.rabbitMQProperties.getVirtualHost()); 
     cf.setExecutor(connectionPool); 
     logger.info(String.format("MQ cache mode: %s", cf.getCacheMode().toString())); 
     logger.info(String.format("MQ connection cache: %d", cf.getConnectionCacheSize())); 
     logger.info(String.format("MQ channel cache: %d", cf.getChannelCacheSize())); 
     logger.info(String.format("MQ thread pool: %d threads", connectionPool.getMaximumPoolSize())); 
     return cf; 
    } 

在上面的代码我有每个连接线的镜像虚拟信道即数数每个虚拟RabbitMQ通道有一个真实的物理线程,因为每个通道都引用一个工作线程,每个线程处理一条消息。这将导致渠道不再阻塞默认的5个连接,而不是采取线程的数量扩张充分利用:

channels no longer blocking

操纵可用的RabbitMQ的连接将显示通道阻塞的线程数。例如,设置为10个线程并打开50个通道。