2014-03-05 100 views
0

我有一个RabbitMQ SimpleRpcServer,我想知道如何正确处置它,当我完成它。该模式有一个主循环,直到它接收到消息,处理消息,然后再次阻止为止。这意味着为了打破循环,我必须发送一个特殊编码的消息,处理程序可以使用它来摆脱循环。如何正确处理SimpleRpcServer对象?

我读过RabbitMQ频道应该从创建它们的同一个线程访问。如果这是真的(我找不到源代码),这是否意味着要处理我的SimpleRpcServer,我将不得不创建一个专门用于将关闭消息发送到主循环的新通道?

这里的伪代码:

主循环(actual code here):

//I run this on a ThreadPool thread 
foreach (var evt in m_subscription) //blocks until a message appears 
{ 
    ProcessRequest(evt); 
} 

处理程序:

private void ProcessRequest(evt) 
{ 
    if(evt.BasicProperties.ContentType == "close") //close flag 
    { 
     base.Close(); 
    } 
    else 
    { 
     //process message 
    } 
} 

处置代码(具体创建一个新的信道用于消除主循环):

//I call this from the main thread 
public void Dipose() 
{ 
    var channel = new Channel(); 
    var props = new Properties("close"); //close flag 
    channel.BasicPublish(queueName, props); 
    channel.Dispose(); 
} 

请注意,我遗漏了一些初始化和处理代码。我知道这段代码不会编译。

回答

1

是的,你不应该在两个或多个线程之间共享通道,请阅读http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v3.2.4/rabbitmq-dotnet-client-3.2.4-user-guide.pdf“2.10。 IModel不应线程”

之间共享要关闭您的频道,你可以创建一个线程使用的信道,则关闭主线程上通道认购,如:

初始化连接,通道等。 。

ConnectionFactory connection_factory = new ConnectionFactory(); 
     IConnection conn = null; 
     IModel channel = null; 
     Subscription sub = null; 
     private void Connect() 
      try 
      { 
       conn = connection_factory.CreateConnection(); 
       channel = conn.CreateModel(); 
       sub = new Subscription(channel, your_queue, true); 
       StartSubscribeThread (sub); 
…. 

创建线程:

public void StartSubscribeThread(Subscription sub) 
{ 
    var t = new Thread(() => InternalStartSubscriber(sub)); 
    t.Start(); 
} 



private void InternalStartSubscriber(Subscription sub) 
{ 
    foreach (BasicDeliverEventArgs e in sub) 
    { 
    //Handle your messages 
    } 
} 

最后:

private void Disconnet() 
sub.Close(); 
channel.Close(); 
channel.Dispose(); 

用这种方法可以避免创建另一个通道来关闭第一个通道