2016-03-09 36 views
1

我正在我的Ubuntu工作站上运行一些测试。这些基准开始填充队列,其运行非常缓慢:我们可以加快通过RabbitMQ发布消息

import pika 
import datetime 

if __name__ == '__main__': 
    try: 
     connection = pika.BlockingConnection(pika.ConnectionParameters(
       host='localhost')) 
     channel = connection.channel() 

     channel.queue_declare(queue='hello_durable', durable=True) 
     started_at = datetime.datetime.now() 
     properties = pika.BasicProperties(delivery_mode=2) 
     for i in range(0, 100000): 
      channel.basic_publish(exchange='', 
            routing_key='hello', 
            body='Hello World!', 
            properties=properties) 
      if i%10000 == 0: 
       duration = datetime.datetime.now() - started_at 
       print(i, duration.total_seconds()) 
     print(" [x] Sent 'Hello World!'") 
     connection.close() 
     now = datetime.datetime.now() 
     duration = now - started_at 
     print(duration.total_seconds()) 
    except Exception as e: 
     print(e) 

发送10K消息需要超过30秒的时间。根据最高命令,工作站有12个内核,这些内核并不繁忙。有超过8Gb的空闲内存。队列是否耐用无关紧要。

我们如何加快发送消息?

+0

发布在多线程? – Gabriele

回答

0

从BlockingConnection切换到SelectConnection产生了巨大的差异,加速了该过程几乎五十次。我需要做的就是修改the following tutorial:的示例,发布消息循环:

import pika 

# Step #3 
def on_open(connection): 

    connection.channel(on_channel_open) 

# Step #4 
def on_channel_open(channel): 

    channel.basic_publish('test_exchange', 
          'test_routing_key', 
          'message body value', 
          pika.BasicProperties(content_type='text/plain', 
               delivery_mode=1)) 

    connection.close() 

# Step #1: Connect to RabbitMQ 
parameters = pika.URLParameters('amqp://guest:[email protected]:5672/%2F') 

connection = pika.SelectConnection(parameters=parameters, 
            on_open_callback=on_open) 

try: 

    # Step #2 - Block on the IOLoop 
    connection.ioloop.start() 

# Catch a Keyboard Interrupt to make sure that the connection is closed cleanly 
except KeyboardInterrupt: 

    # Gracefully close the connection 
    connection.close() 

    # Start the IOLoop again so Pika can communicate, it will stop on its own when the connection is closed 
    connection.ioloop.start() 
1

假设您不运行任何消费者These benchmarks start with populating a queue。 由于您只发布消息,因此rabbitmq切换为流状态。更确切地说,你的交易所和/或队列进入流动状态。从rabbitmq blog

引用这(大约)意味着客户端是被速率限制;它会 要发布速度更快,但服务器无法跟上

我敢肯定,如果你看够近,你会看到消息的第一部分(在初始设置,与空队列)速度很快,但发送速率在某个点急剧下降。

相关问题