2013-08-23 177 views
0

我试图设置一个命名管道服务器和客户端在两个程序之间发送数据。 我的问题是,当我的数据被收回,例如。 BeginRead命令om服务器触发器在我从客户端序列化了一个对象后,它为同样的消息触发了20次回调。目标是客户端程序将向服务器程序发送命令。当服务器处理任务时,当有一个连接时,它会将状态更新发送回客户端。命名管道异步

这是我目前的测试程序。

class Program 
{ 
    static void Main(string[] args) 
    { 
     var server = new PipeServer(); 
     server.Init(); 

     var client = new PipeClient(); 
     if (client.Connect()) 
     { 
      Console.WriteLine("Connected to server."); 
     } 
     else 
     { 
      Console.WriteLine("Connection failed."); 
      return; 
     } 

     while (true) 
     { 
      Console.Write(" \\> "); 
      string input = Console.ReadLine(); 
      if (string.IsNullOrEmpty(input)) break; 

      var arr = input.Split(new char[] { ' ' }, StringSplitOptions.RemoveEmptyEntries); 
      int value = 0; 

      if (arr.Length != 2) break; 
      if (!int.TryParse(arr[1], out value)) break; 

      var obj = new PipeObject { Name = arr[0], Value = value }; 
      client.Send(obj); 

      //string result = f.Deserialize(client) as string; 
      //Console.WriteLine(result); 
     } 
    } 
} 

internal class PipeServer 
{ 
    IFormatter Formatter = new BinaryFormatter(); 
    public NamedPipeServerStream Instance { get; internal set; } 
    public bool IsConnected { get; internal set; } 
    byte[] buffer = new byte[65535]; 
    public object Message { get; set; } 

    StreamReader sr; 
    StreamWriter sw; 

    internal PipeServer() 
    { 
     IsConnected = false; 
    } 

    public void Init() 
    { 
     var ps = new PipeSecurity(); 
     ps.AddAccessRule(new PipeAccessRule(WindowsIdentity.GetCurrent().User, PipeAccessRights.FullControl, AccessControlType.Allow)); 
     ps.AddAccessRule(new PipeAccessRule(new SecurityIdentifier(WellKnownSidType.AuthenticatedUserSid, null), PipeAccessRights.ReadWrite, AccessControlType.Allow)); 

     Instance = new NamedPipeServerStream("Levscan4Pipe", PipeDirection.InOut, 1, PipeTransmissionMode.Message, PipeOptions.Asynchronous | PipeOptions.WriteThrough, 65535, 65535, ps); 

     sr = new StreamReader(Instance); 
     sw = new StreamWriter(Instance); 

     Instance.BeginWaitForConnection(OnClientConnected, Instance); 

     Thread t = new Thread(Run); 
     t.Start(); 
    } 

    void Run() 
    { 
     int index = 0; 
     if (IsConnected) 
     { 
      try 
      { 
       Instance.BeginRead(buffer, 0, buffer.Length, OnRead_Completed, Instance); 
       //index += Instance.Read(buffer, 0, buffer.Length); 
       //try 
       //{ 
       // using (var ms = new MemoryStream(buffer)) 
       // { 
       //  Message = Formatter.Deserialize(ms); 
       //  index = 0; 
       // } 
       //} 
       //catch (Exception e) 
       //{ 
       // Debug.WriteLine(e.Message); 
       // Debug.WriteLine(e.StackTrace); 
       //} 
      } 
      catch (IOException) 
      { 
       IsConnected = false; 
       Instance.Disconnect(); 
      } 
     } 

     Thread.Sleep(Timeout.Infinite); 
     //Instance.WaitForConnection(); 
     //Thread t = new Thread(Run); 
     //t.Start(); 
    } 

    void OnClientConnected(IAsyncResult ar) 
    { 
     Instance.EndWaitForConnection(ar); 
     IsConnected = true; 
    } 

    void OnRead_Completed(IAsyncResult ar) 
    { 
     var bytes = Instance.EndRead(ar); 
     Debug.WriteLine("{1} > Read completed - bytes read: {0}".FormatWith(bytes, DateTime.Now.ToString())); 

     //try 
     //{ 
     // using (var ms = new MemoryStream(buffer)) 
     // { 
     //  Message = Formatter.Deserialize(ms); 
     // } 
     //} 
     //catch (Exception e) 
     //{ 
     // Debug.WriteLine(e.Message); 
     // Debug.WriteLine(e.StackTrace); 
     //} 
    } 
} 

internal class PipeClient 
{ 
    IFormatter f = new BinaryFormatter(); 
    public NamedPipeClientStream Instance { get; internal set; } 
    StreamWriter sw; 
    StreamReader sr; 

    public PipeClient() 
    { 
     Instance = new NamedPipeClientStream(".", "Levscan4Pipe", PipeDirection.InOut, PipeOptions.Asynchronous | PipeOptions.WriteThrough); 
     sr = new StreamReader(Instance); 
     sw = new StreamWriter(Instance); 
    } 

    public bool Connect() 
    { 
     try 
     { 
      Instance.Connect(5000); 
      Instance.ReadMode = PipeTransmissionMode.Message; 
      Instance.WaitForPipeDrain(); 
      return true; 
     } 
     catch 
     { 
      return false; 
     } 
    } 

    public void Send(object obj) 
    { 
     f.Serialize(Instance, obj); 
     Instance.Flush(); 
     Instance.WaitForPipeDrain(); 
    } 
} 

编辑

改变了while循环到如果开始的BeginRead。这解决了多个回调,但我仍然没有得到一个完整的消息。

+2

我建议你阅读文档。例如,在流上调用'Read'将返回读取的字节数。它看起来像你假设你会在一次阅读中阅读你的整个信息,这不一定是这样。 –

+0

我得到在我的回调中读取的字节数,但我不能得到消息的字节总数。我的NamedPipeServerStream对象不支持长度和位置。 也想通了,我不能使用一段时间(IsConnected),每次调用BeginRead函数,导致我的回调同时运行多次。 – Azazel

+0

如果我序列化我的对象到一个内存流,然后从中获取字节并将它们全部发送到pipestreams中写入方法我在回调中直接获取消息并且它可以工作。为什么不用我的pipestream直接序列化得到相同的结果? – Azazel

回答

1

如果服务器像写入流:

write field 1 
write field 2 
write field 3 
etc. 

还有就是写之间的一段时间,接收器(程序)可以读取前三个字段,而服务器还在写其他。管道流不知道服务器何时完成写入,因此它不能缓冲所有内容并将其全部发送给您。

当服务器首先将所有内容写入内存流,然后将内存流复制到管道流中时,您的程序可以一次完成所有内容。也许。如果服务器正在发送一个非常大的数据包,则可能只是读取其中的一部分。

管道流只是一个字节流。它不会对数据施加任何格式。它没有任何记录或任何类似的概念。因此,您必须像字节流一样对待它,并自己编写记录等。

如果您需要知道从服务器发送的记录大小,服务器必须将该信息放入流中为你。通常,服务器将写入长度,然后写入数据。接收器可以读取长度,将其转换为整数,然后从流中读取许多字节。而且,是的,它可能需要多次读取才能获得所有的字节。这只是字节流的性质。

处理这个问题的另一种方法是创建一个记录结束标记。所以服务器发送它的数据并且你的程序读取,直到找到表示记录结束的字节序列。不过,您必须小心,因为服务器可能会发送多个记录,您的读取可能会抓取一条记录的结尾以及下一条记录的结尾。

使用字节流可能会有很多工作,因为您必须在读取字节后重新构建记录。如果可以的话,使用现有框架(如WCF,如其中一个评论中所述)更容易。