OpenStack使用RabbitMQ作为消息传递系统。有几个交换和队列用于这个目的。我发现用于消息传输的名为“nova”的交换类型为“topic”。 Exchange使用路由键将消息路由到队列(http://www.rabbitmq.com/tutorials/amqp-concepts.html)。 (http://www.rabbitmq.com/img/tutorials/intro/hello-world-example-routing.png的实用图片 - 没有足够的声望在此发布) OpenStack中有几个队列,如计算,证书,网络等。他们使用具有相同名称的路由密钥。因此,我使用这些路由键创建了几个新的队列,将它们与处理消息的消费者绑定。例如,有一个名为“compute”的队列使用名为“compute”的路由键。我创建了使用相同路由密钥的新队列“my_compute”。正如我认为它应该工作,我会得到消息。连接到OpenStack中的RabbitMQ代理
我有一些代码连接到交换,创建我的队列和消费者。
def connect(params):
connection = kombu.Connection(hostname=params['host'])
exchange = kombu.entity.Exchange(name=params['exchange_name'],
type=params['exchange_type'],
durable=params['exchange_durable'],
auto_delete=params['exchange_auto_delete'],
internal=params['exchange_internal'])
queue_list = []
for queue in params['queues_params']:
queue_list.append(kombu.messaging.Queue(name=queue['name'],
exchange=exchange,
routing_key=queue['routing_key'],
channel=connection.channel(),
durable=queue['durable'],
auto_delete=queue['auto_delete']))
consumer = kombu.messaging.Consumer(channel=connection.channel(),
queues=queue_list,
no_ack=True,
callbacks=[self._process_message])
consumer.consume()
return connection
争论 “PARAMS” 是从JSON文件中得到了地图:
{
"host" : "xxx",
"exchange_name" : "nova",
"exchange_type" : "topic",
"exchange_durable" : false,
"exchange_auto_delete" : false,
"exchange_internal" : false,
"queues_params" : [
{
"name" : "my_compute",
"routing_key" : "compute",
"durable" : false,
"auto_delete" : false,
"arguments" : [ ]
},
{
"name" : "my_network",
"routing_key" : "network",
"durable" : false,
"auto_delete" : false,
"arguments" : [ ]
},
.
.
.
它的工作。但我只收到网络队列的消息。我不知道是否还有其他消息,但看起来好像有。我对吗?或者有什么不对?还有其他的消息,我怎样才能得到它们?
谢谢! Celiometer对我来说可能很有用。我会处理它。 –