2011-07-12 65 views
3

我有两个应用程序,app1.cs和app2.cs(代码如下)。此外,我还有一个DLL,我从refer.cs中提取(代码如下)。当我编译app1.cs(它发送一个测量对象)时,我得到以下异常:RabbitMQ和序列化奇怪的错误

Unhandled Exception: RabbitMQ.Client.Exceptions.OperationInterruptioedException 

我看不到连接如何中断。你看到问题在哪里引起?

问候, 黛咪

//refer.cs from which refer.dll is created 

using System; 
using System.IO; 
using System.Collections.Generic; 
using System.Runtime.Serialization; 
using System.Runtime.Serialization.Formatters.Binary; 

namespace refer 
{ 
    //start alternate serialization 
    public static class AltSerialization 
    { 
     public static byte[] AltSerialize(Measurement m) 
     { 
     using (var ms = new MemoryStream()) 
      { 
       var bf = new BinaryFormatter(); 
       bf.AssemblyFormat = System.Runtime.Serialization.Formatters.FormatterAssemblyStyle.Simple; 
       bf.Serialize(ms, m); 
       return ms.GetBuffer(); 
      } 
     } 

     public static Measurement AltDeSerialize(byte[] seriM) 
     { 
     using (var stream = new MemoryStream(seriM)) 
      { 
       BinaryFormatter bf = new BinaryFormatter(); 
       bf.AssemblyFormat = System.Runtime.Serialization.Formatters.FormatterAssemblyStyle.Simple; 
       return (Measurement)bf.Deserialize(stream);   
      } 
     } 
    } 
    //end alternte serialization 

    [Serializable] //This attribute sets class to be serialized 
    public class Measurement : ISerializable 
    {    
     [NonSerialized] public int id; 
     public int time; //timestamp 
     public double value; 

     public Measurement() 
     { 
      id = 1; 
      time = 12; 
      value = 0.01; 
     } 

     public Measurement(int _id, int _time, double _value) 
     { 
      id = _id; 
      time = _time; 
      value = _value; 
     } 

     //Deserialization constructor 
     public Measurement(SerializationInfo info, StreamingContext ctxt) 
     { 
      //Assign the values from info to the approporiate properties 
      Console.WriteLine("DeSerialization construtor called."); 
      time = (int)info.GetValue("MeasurementTime", typeof(int)); 
      value = (double)info.GetValue("MeasurementValue", typeof(double)); 
     } 

     //Serialization function 
     public void GetObjectData(SerializationInfo info, StreamingContext ctxt) 
     { 
      // Custom name-value pair 
      // Values must be read with the same name they're written  
      info.AddValue("MeasurementTime", time); 
      info.AddValue("MeasurementValue", value); 
     } 
    } 
} 

//MB1.cs

using System; 
using System.IO; 
using System.Collections.Generic; 
using System.Runtime.Serialization; 
using System.Runtime.Serialization.Formatters.Binary; 
using RabbitMQ.Client; 
using RabbitMQ.Client.Events; 
using UtilityMeasurement; 

public interface IMessageBus 
{  
string MsgSys  // Property 1 
{ 
    get; 
    set; 
} 

void write (Measurement m1); 
Measurement read(); 
void publish(string queue); 
void subscribe(string queue); 
} 

public class Rabbit : IMessageBus 
{ 
// Implementation of methods for Rabbit class go here 
private List<string> publishQ = new List<string>(); 
private List<string> subscribeQ = new List<string>(); 


public void write (Measurement m1) 
{ 
    byte[] body = Measurement.AltSerialize(m1); 

    IConnection connection = factory.CreateConnection(); 
    IModel channel = connection.CreateModel(); 

    foreach (string queue in publishQ) 
    { 
     channel.BasicPublish("", queue, null, body); 
     Console.WriteLine("\n [x] Sent to queue {0}.", queue); 
    } 
} 

public void publish(string queueName) 
{  
    channel.QueueDeclare(queueName, true, false, false, null); //durable=true 
    publishQ.Add(queueName); //and, add it the list of queue names to publish to 
} 

public Measurement read() 
{ 
    QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel); 
    foreach (string queue in subscribeQ) 
    { 
     channel.BasicConsume(queue, true, consumer); 
    } 
    System.Console.WriteLine(" [*] Waiting for messages." + 
          "To exit press CTRL+C"); 
    BasicDeliverEventArgs ea = 
     (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 
    return Measurement.AltDeSerialize(ea.Body); 
} 

public void subscribe(string queueName) 
{ 
    channel.QueueDeclare(queueName, true, false, false, null); 
    subscribeQ.Add(queueName); 
} 

public static string MsgSysName; 
public string MsgSys 
{ 
    get 
    { 
     return MsgSysName; 
    } 
    set 
    { 
     MsgSysName = value; 
    } 
} 

public Rabbit(string _msgSys) //Constructor 
{ 
    ConnectionFactory factory = new ConnectionFactory(); 
    factory.HostName = "localhost"; 

    System.Console.WriteLine("\nMsgSys: RabbitMQ"); 
    MsgSys = _msgSys; 
} 
} 

public class Zmq : IMessageBus 
{ 
public void write (Measurement m1) 
{ 
    // 
} 
public Measurement read() 
{ 
    // 
    return null; 
} 
public void publish(string queue) 
{ 
// 
} 
public void subscribe(string queue) 
{ 
//  
} 

public static string MsgSysName; 
public string MsgSys 
{ 
    get 
    { 
     return MsgSysName; 
    } 
    set 
    { 
     MsgSysName = value; 
    } 
} 

// Implementation of methods for Zmq class go here 
public Zmq(string _msgSys) //Constructor 
{ 
    System.Console.WriteLine("ZMQ"); 
    MsgSys = _msgSys; 
} 
} 

public class MessageBusFactory 
{ 
public static IMessageBus GetMessageBus(string MsgSysName) 
{ 
    switch (MsgSysName) 
    { 
     case "Zmq": 
      return new Zmq(MsgSysName); 
     case "Rabbit": 
      return new Rabbit(MsgSysName); 
     default: 
      throw new ArgumentException("Messaging type " + 
       MsgSysName + " not supported."); 
    } 
} 
} 

public class MainClass 
{ 
    public static void Main() 
    { 
    //Asks for the message system 
    System.Console.WriteLine("\nEnter name of messageing system: "); 
    System.Console.WriteLine("Usage: [Rabbit] [Zmq]"); 
    string MsgSysName = (System.Console.ReadLine()).ToString(); 

    //Create a new Measurement message 
    Measurement m1 = new Measurement(2, 2345, 23.456); 

    //Declare an IMessageBus instance: 
    //Here, an object of the corresponding Message System 
     // (ex. Rabbit, Zmq, etc) is instantiated 
    IMessageBus obj1 = MessageBusFactory.GetMessageBus(MsgSysName); 

    System.Console.WriteLine("\nA {0} object is now created.", MsgSysName); 

    System.Console.WriteLine("With Test message:\n ID: {0}", m1.id); 
    System.Console.WriteLine(" Time: {0}", m1.time); 
    System.Console.WriteLine(" Value: {0}", m1.value); 

    // Ask queue name and store it 
    System.Console.WriteLine("Enter a queue name to publish the message to: "); 
    string QueueName = (System.Console.ReadLine()).ToString(); 
    obj1.publish(QueueName); 

    System.Console.WriteLine("Enter another queue name: "); 
    QueueName = (System.Console.ReadLine()).ToString(); 
    obj1.publish(QueueName); 

    // Write message to the queue 
    obj1.write(m1); 

} 
} 

//MB2.cs

using System; 
using System.IO; 
using System.Collections.Generic; 
using System.Runtime.Serialization; 
using System.Runtime.Serialization.Formatters.Binary; 
using RabbitMQ.Client; 
using RabbitMQ.Client.Events; 
using UtilityMeasurement; 

public interface IMessageBus 
{  
string MsgSys  // Property 1 
{ 
    get; 
    set; 
} 

void write (Measurement m1); 
Measurement read(); 
void publish(string queue); 
void subscribe(string queue); 
} 

public class Rabbit : IMessageBus 
{ 
// Implementation of methods for Rabbit class go here 
private List<string> publishQ = new List<string>(); 
private List<string> subscribeQ = new List<string>(); 


public void write (Measurement m1) 
{ 
    byte[] body = Measurement.AltSerialize(m1); 

    IConnection connection = factory.CreateConnection(); 
    IModel channel = connection.CreateModel(); 

    foreach (string queue in publishQ) 
    { 
     channel.BasicPublish("", queue, null, body); 
     Console.WriteLine("\n [x] Sent to queue {0}.", queue); 
    } 
} 

public void publish(string queueName) 
{  
    channel.QueueDeclare(queueName, true, false, false, null); //durable=true 
    publishQ.Add(queueName); //and, add it the list of queue names to publish to 
} 

public Measurement read() 
{ 
    QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel); 
    foreach (string queue in subscribeQ) 
    { 
     channel.BasicConsume(queue, true, consumer); 
    } 
    System.Console.WriteLine(" [*] Waiting for messages." + 
          "To exit press CTRL+C"); 
    BasicDeliverEventArgs ea = 
     (BasicDeliverEventArgs)consumer.Queue.Dequeue(); 
    return Measurement.AltDeSerialize(ea.Body); 
} 

public void subscribe(string queueName) 
{ 
    channel.QueueDeclare(queueName, true, false, false, null); 
    subscribeQ.Add(queueName); 
} 

public static string MsgSysName; 
public string MsgSys 
{ 
    get 
    { 
     return MsgSysName; 
    } 
    set 
    { 
     MsgSysName = value; 
    } 
} 

public Rabbit(string _msgSys) //Constructor 
{ 
    ConnectionFactory factory = new ConnectionFactory(); 
    factory.HostName = "localhost"; 

    System.Console.WriteLine("\nMsgSys: RabbitMQ"); 
    MsgSys = _msgSys; 
} 
} 


public class Zmq : IMessageBus 
{ 
public void write (Measurement m1) 
{ 
    // 
} 
public Measurement read() 
{ 
    // 
    return null; 
} 
public void publish(string queue) 
{ 
// 
} 
public void subscribe(string queue) 
{ 
//  
} 

public static string MsgSysName; 
public string MsgSys 
{ 
    get 
    { 
     return MsgSysName; 
    } 
    set 
    { 
     MsgSysName = value; 
    } 
} 

// Implementation of methods for Zmq class go here 
public Zmq(string _msgSys) //Constructor 
{ 
    System.Console.WriteLine("ZMQ"); 
    MsgSys = _msgSys; 
} 
} 

public class MessageBusFactory 
{ 
public static IMessageBus GetMessageBus(string MsgSysName) 
{ 
    switch (MsgSysName) 
    { 
     case "Zmq": 
      return new Zmq(MsgSysName); 
     case "Rabbit": 
      return new Rabbit(MsgSysName); 
     default: 
      throw new ArgumentException("Messaging type " + 
       MsgSysName + " not supported."); 
    } 
} 
} 

public class MainClass 
{ 
    public static void Main() 
    { 
    //Asks for the message system 
    System.Console.WriteLine("\nEnter name of messageing system: "); 
    System.Console.WriteLine("Usage: [Rabbit] [Zmq]"); 
    string MsgSysName = (System.Console.ReadLine()).ToString(); 

    //Declare an IMessageBus instance: 
    //Here, an object of the corresponding Message System 
     // (ex. Rabbit, Zmq, etc) is instantiated 
    IMessageBus obj1 = MessageBusFactory.GetMessageBus(MsgSysName); 

    System.Console.WriteLine("\nA {0} object is now created.", MsgSysName); 

    System.Console.WriteLine("Enter a queue to subscribe to: "); 
    string QueueName = (System.Console.ReadLine()).ToString(); 
    obj1.subscribe(QueueName); 

    //Create a new Measurement object m2 
    Measurement m2 = new Measurement(); 

    //Read message into m2 
    m2 = obj1.read(); 
    m2.id = 11; 
    System.Console.WriteLine("\nMessage received from queue {0}:\n ID: {1}",QueueName, m2.id); 
    System.Console.WriteLine(" Time: {0}", m2.time); 
    System.Console.WriteLine(" Value: {0}", m2.value); 
} 
} 

回答

2

我刚刚创建了一个香草C#VS2010控制台应用程序项目,同一个项目中的Refer.cs和App1.cs。

我进行了如下修改:

  • 新增RabbitMQ.Client.dll
  • 删除了的AssemblyVersion属性
  • 增补字串[] args到Main方法中App1.cs

此外,我更改了:

factory.HostName = "localhost"; 

要这样:

factory.HostName = "192.168.56.101"; 

即IP地址到我的Ubuntu的VirtualBox虚拟机上运行的RabbitMQ服务器。没有发生异常,并且服务器上已成功接收到消息。

所有迹象都指向给出的服务器配置。我的猜测是你的rabbitmq服务器根本没有运行,它不在本地主机上运行,​​或者与端口5672有某种连接问题。

+0

@KG我删除了AssemblyVersion属性,它工作。我在编译app1.cs和app2.cs时(我正在使用命令行)添加RabbitMQ.Client dll。所以,非常感谢!但是,我不明白为什么这可能是一个问题? – Demi

+0

@Demi项目AssemblyInfo中已经有一个AssemblyVersion属性。如果它存在于两个地方,你甚至不应该编译它。如果你的项目 - 并没有AssemblyInfo.cs,你就不会有冲突。你在使用Visual Studio吗? – karlgrz

+0

@KG会做。在类似的说明中,我将发送和接收代码封装到一个类中,创建了一个构造函数,并在此构造函数中放置了工厂,连接,通道的声明,希望每个app1和app2创建的对象都可以在连接。但是,当我尝试在订阅和发布方法中使用工厂,连接和通道时,出现错误,说它不知道这些属性。你如何将连接,主机名等公开给类,以便类中的方法利用这个声明? – Demi