2014-05-08 87 views
3

广播RabbitMQ的消息有没有办法使方法与ServiceStack

myMessageService.CreateMessageQueueClient().Publish(myMessage); 

广播消息给所有接收者?

回答

2

问题是,RegisterHandler<T>内部使用T类型来构建它所监听的队列名称。所以你唯一的机会就是利用自定义扇出交换到多个队列去掉踪有以下解决方案:

var fanoutExchangeName = string.Concat(QueueNames.Exchange, 
             ".", 
             ExchangeType.Fanout); 

在你的系统的某些时候你必须确保用下面的代码交换:所以

var message = new Message<T>(yourInstance); 
messageProducer.Publish(QueueNames<T>.In, // routing key 
         message,    // message 
         fanoutExchangeName); // exchange 

现在消息被发布到我们的交流,但我们需要队列绑定到:

var rabbitMqServer = new RabbitMqServer(); 
var messageProducer = (RabbitMqProducer) rabbitMqServer.CreateMessageProducer(); 
var channel = messageProducer.Channel; // you can use any logic to acquire a channel here - this is just a demo 
channel.ExchangeDeclare(fanoutExchangeName, 
         ExchangeType.Fanout, 
         durable: true, 
         autoDelete: false, 
         arguments: null); 

现在,我们可以将消息发布到该扇出在消耗部件,这是我们跟做了交流:

var rabbitMqServer = new RabbitMqServer(); 
var messageQueueClient = (RabbitMqQueueClient) rabbitMqServer.CreateMessageQueueClient(); 
var channel = messageQueueClient.Channel; // you just need to get the channel 

var queueName = messageQueueClient.GetTempQueueName(); 
channel.QueueBind(queueName,   // queue 
        fanoutExchangeName, // exchange 
        QueueName<T>.In); // routing key 

队列自动最后一个(只)消费者断开连接后删除,将无法生存的RabbitMQ重启。

现在的哈克部分是虽然听...

var consumer = new RabbitMqBasicConsumer(channel); 
channel.BasicConsume(queueName, 
        false, 
        consumer); 

Task.Run(() => 
      { 
       while (true) 
       { 
        BasicGetResult basicGetResult; 
        try 
        { 
         basicGetResult = consumer.Queue.Dequeue(); 
        } 
        catch (EndOfStreamException endOfStreamException) 
        { 
         // this is ok 
         return; 
        } 
        catch (OperationInterruptedException operationInterruptedException) 
        { 
         // this is ok 
         return; 
        } 
        catch (Exception ex) 
        { 
         throw; 
        } 
        var message = basicGetResult.ToMessage<T>(); 
        // TODO processing 
       } 
      }); 

该解决方案不提供任何自动重新连接,过滤器或其他的东西,虽然。

基本演练是available here

编辑: 让我想起了一件事:您可以使用ServiceStack.Messaging.MessageHandler<T>实例轻松提供回复和重试。

+0

哇,谢谢你这样的完整答案。我几乎忘记了原来的麻烦,但如果我没有弄错,我只是手动调整RabbitMq配置来做我想做的事情。 –

+0

@VictorSuzdalev你绝对欢迎!我觉得这个问题(因为它很常见)应该在ServiceStack内的封装解决方案中解决,尽管... –

+0

ServiceStack的RabbitMQ配置代码非常简单,所以我认为PR将受到欢迎! )) –