2011-10-31 106 views
9

“持久”和“持久模式”似乎与重新启动有关,而不涉及没有订户接收消息。没有订阅者的RabbitMQ队列

我希望RabbitMQ在没有订户的情况下将消息保留在队列中。当用户上线时,该用户应收到该消息。 RabbitMQ可以吗?

代码示例:

服务器:

namespace RabbitEg 
{ 
    class Program 
    { 
     private const string EXCHANGE_NAME = "helloworld"; 

     static void Main(string[] args) 
     { 
      ConnectionFactory cnFactory = new RabbitMQ.Client.ConnectionFactory() { HostName = "localhost" }; 

      using (IConnection cn = cnFactory.CreateConnection()) 
      { 
       using (IModel channel = cn.CreateModel()) 
       { 
        //channel.ExchangeDelete(EXCHANGE_NAME); 
        channel.ExchangeDeclare(EXCHANGE_NAME, "direct", true); 
        //channel.BasicReturn += new BasicReturnEventHandler(channel_BasicReturn); 

        for (int i = 0; i < 100; i++) 
        { 
         byte[] payLoad = Encoding.ASCII.GetBytes("hello world _ " + i); 
         IBasicProperties channelProps = channel.CreateBasicProperties(); 
         channelProps.SetPersistent(true); 

         channel.BasicPublish(EXCHANGE_NAME, "routekey_helloworld", false, false, channelProps, payLoad); 

         Console.WriteLine("Sent Message " + i); 
         System.Threading.Thread.Sleep(25); 
        } 

        Console.ReadLine(); 
       } 
      } 
     } 
    } 
} 

客户:

namespace RabbitListener 
{ 
    class Program 
    { 
     private const string EXCHANGE_NAME = "helloworld"; 

     static void Main(string[] args) 
     { 
      ConnectionFactory cnFactory = new ConnectionFactory() { HostName = "localhost" }; 

      using (IConnection cn = cnFactory.CreateConnection()) 
      { 
       using (IModel channel = cn.CreateModel()) 
       { 
        channel.ExchangeDeclare(EXCHANGE_NAME, "direct", true); 

        string queueName = channel.QueueDeclare("myQueue", true, false, false, null); 
        channel.QueueBind(queueName, EXCHANGE_NAME, "routekey_helloworld"); 

        Console.WriteLine("Waiting for messages"); 

        QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel); 
        channel.BasicConsume(queueName, true, consumer); 

        while (true) 
        { 
         BasicDeliverEventArgs e = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 
         Console.WriteLine(Encoding.ASCII.GetString(e.Body)); 
        } 
       } 
      } 
     } 
    } 
} 

回答

12

对于什么durablepersistent意味着一个解释,请参阅AMQP Reference

基本上,队列要么是durable要么是non-durable。前者经纪人重新生存,后者不。

消息发布为transientpersistent。这个想法是,队列上的persistent消息也应该能够在代理重新启动的情况下生存。

所以,要得到你想要的,你需要1)声明队列为durable和2)发布消息为persistent。另外,您可能还希望在频道上启用发布商确认;这样,你就会知道经纪人何时承担了该消息的责任。

+0

谢谢,我已经试过这个,但是如果没有客户端收听,它仍然不会保留消息。代码示例附加到问题。 –

+0

不错的代码。两个问题:1)服务器还应该*声明队列;声明它两次不是问题,这是一个好习惯,并且2)queueDeclare()为您提供了一个匿名的非持久队列;你想要queueDeclare(“myQueue”,true,false,false,null)。 – scvalex

+0

此外,您编辑问题的方式使您很难理解您要实现的目标。 – scvalex