立足我的建筑离这个例子:通道在RabbitMQ的空转与工作队列情景
设置:
- 工人同时
- 得到一个消息,每个劳工下载量文档,需要几秒钟
- 工人成功下载文档后,它会确认邮件
- 如果工人不能下载一个文件,它为noAcks重新排队(三级重试的最大值)
我期待到我的实现的瓶颈是造成减速的消息。因为我使用noAck来重新排队失败的工人。要启用此均匀地分布在我的工作线程我已经设置预取1.Looking在这个问题:RabbitMQ work queue is blocking consumers - 他们看到我所看到的在下面的截图:
要使确保工人一次只能分配一条消息,我需要将预取设置为1,但也有人说这会导致工人顺序工作而不是并行工作。
跑步在渠道层面究竟意味着什么?我看到队列和连接正常运行,但单个通道(每个线程一个)空闲。
编辑#1:关于将连接池传递给RabbitMQ连接的说明看起来很有希望。 https://www.rabbitmq.com/api-guide.html#consumer-thread-pool我使用Spring AMQP,但我认为类似的方法可以用在这里:
/**
* Configure a large thread pool for concurrent channels on the physical Connection
*/
@Bean
public org.springframework.amqp.rabbit.connection.CachingConnectionFactory rabbitConnectionFactory() {
logger.info("Configuring connection factory");
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses(this.rabbitMQProperties.getAddresses());
cf.setUsername(this.rabbitMQProperties.getUsername());
cf.setPassword(this.rabbitMQProperties.getPassword());
cf.setVirtualHost(this.rabbitMQProperties.getVirtualHost());
//configure a large thread pool for the connection thread
int threads = 30;
logger.info(String.format("Configuring thread pool with %d threads", threads));
ExecutorService connectionPool = Executors.newFixedThreadPool(threads);
cf.setExecutor(connectionPool);
logger.info(String.format("MQ cache mode: %s", cf.getCacheMode().toString()));
logger.info(String.format("MQ connection cache: %d", cf.getConnectionCacheSize()));
logger.info(String.format("MQ channel cache: %d", cf.getChannelCacheSize()));
return cf;
}
@Bean
AmqpTemplate rabbitTemplate(org.springframework.amqp.rabbit.connection.CachingConnectionFactory connectionFactory){
AmqpTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
return rabbitTemplate;
}