2016-05-15 14 views
1

负载平衡器接受传入的请求,将它们重新发送到多个服务器,并将服务器的答案返回给等待中的客户端。只有在VS调试器中设置断点时,C#服务器负载均衡器才能工作?

// Dispatcher.cs 
using System; 
using System.Collections.Generic; 
using System.Net; 
using System.Net.Sockets; 
using System.Threading; 

namespace LoadBallancer { 
    public class Dispatcher 
    { 
     // set the TcpListener on port 8890 
     int port = 8890; 
     TcpListener server; 
     List<CoreComm> processors = new List<CoreComm>(); 

     static void Main() 
     { 
      var dispatcher = new Dispatcher(); 
      dispatcher.ListenForRequests(); 
     } 

     public Dispatcher() 
     { 
      server = new TcpListener(IPAddress.Any, port); 
     } 

     public void ListenForRequests() 
     { 
      server.Start(); 
      while (true) 
      { 
       try 
       { 
        // Start listening for client requests 
        // Enter the listening loop 

        Console.Write("Waiting for a connection... "); 

        lock(server) 
        { 
         // Perform a blocking call to accept requests. 
         TcpClient client = server.AcceptTcpClient(); 

         Console.WriteLine("Connected."); 

         ThreadPool.QueueUserWorkItem(ThreadProc, client); 
        } 
       } 
       catch (Exception e) 
       { 
        Console.WriteLine("Exception: {0}", e); 
       } 
      } 
     } 
     private static void ThreadProc(object obj) 
     { 
      var processor = new CoreComm((TcpClient)obj); 
      processor.ReSendRequest(null); 
     } 
    } 
} 

// CoreComm.cs

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Net.Sockets; 

using System.Configuration; 
using System.Threading; 

namespace LoadBallancer 
{ 
    public class IamServer 
    { 
     public string Url { get; set; } 
     public int  Port { get; set; } 
     public string Type { get; set; } 
    } 

    public class CoreComm 
    { 
     // Buffer for reading data 
     int bufSize = 1024; 
     static List<IamServer> servers = new List<IamServer>(); 

     protected TcpClient acceptorSocket; 
     NetworkStream acceptorStream; 

     protected TcpClient clientSocket; 

     protected List<KeyValuePair<int, byte[]>> requestPackets = new List<KeyValuePair<int, byte[]>>(); 

     static CoreComm() 
     { 
      // reading config for servers' parameters 
     } 

     public CoreComm(TcpClient socket) 
     { 
      acceptorSocket = socket; 
      // Get a stream object for reading and writing 
      acceptorStream = acceptorSocket.GetStream(); 
     } 

     private void ReadFromAcceptorStream() 
     { 
      // Loop to receive all the data sent by the client. 
      while (acceptorStream.DataAvailable) 
      { 
       byte[] requestBuffer = new byte[bufSize]; 
       int i = acceptorStream.Read(requestBuffer, 0, requestBuffer.Length); 
       requestPackets.Add(new KeyValuePair<int, byte[]>(i, requestBuffer)); 
      } 
     } 

     public void ReSendRequest(Object threadContext) 
     { 
      ReadFromAcceptorStream(); 

      var servers = GetDestinationServers(null); 

      if (servers.Count == 0) 
       acceptorStream.Write(ErrMessage, 0, ErrMessage.Length); 
      else 
       // for debug only send the first in the list 
       SendRequestToServer(servers[0]); 

      // Shutdown and end connection 
      acceptorSocket.Close(); 
     } 

     public void SendRequestToServer(IamServer server) 
     { 
      clientSocket = new TcpClient(); 
      clientSocket.Connect(server.Url, server.Port); 
      NetworkStream clientStream = clientSocket.GetStream(); 

      foreach (var packet in requestPackets) 
       clientStream.Write(packet.Value, 0, packet.Key); 

      var requestBuffer = new byte[bufSize]; 

      while (clientStream.DataAvailable) 
      { 
       int i = clientStream.Read(requestBuffer, 0, requestBuffer.Length); 
       acceptorStream.Write(requestBuffer, 0, i); 
      } 

      clientSocket.Close(); 
     } 

     // Mock up of the real load balancing algorithm 
     static int lastServerAnswered = 0; 

     public List<IamServer> GetDestinationServers(string requestData) 
     { 
      // processing to determine the query destinations 
      lock(servers) 
      { 
       // patch 
       int currentServerNum = lastServerAnswered; 
       lastServerAnswered ++ ; 
       if (lastServerAnswered > servers.Count - 1) 
        lastServerAnswered = 0; 

       return new List<IamServer> { servers[currentServerNum] }; 
      } 
     } 

    } 
} 

所以它的工作原理权当我设置断点的代码,并且不以其他方式使用。 任何想法?

回答

0

的问题被认为是代码:

while (clientStream.DataAvailable) 
{ 
     int i = clientStream.Read(requestBuffer, 0, requestBuffer.Length); 
     acceptorStream.Write(requestBuffer, 0, i); 
} 

实际上它发生了一些包clientStream.DataAvailable是即使有仍然剩余的数据被接收错误的。该解决方案是基于为所述负载平衡器已经开发的应用层协议,它在第一个4个字节是sent.The码的总的字节数随着所述流的发送如下:

var responseBuffer = new byte[bufSize]; 

int numTotalBytesStreamed = clientStream.Read(responseBuffer, 0, responseBuffer.Length); 
int numBytesToStream = GetNumBytesInTheStream(responseBuffer); 

acceptorStream.Write(responseBuffer, 0, numTotalBytesStreamed); 

while (numBytesToStream > numTotalBytesStreamed) 
{ 
    while (!clientStream.DataAvailable) 
     Thread.Sleep(1); 

     int numMoreBytesStreamed = clientStream.Read(responseBuffer, 0, responseBuffer.Length); 
     acceptorStream.Write(responseBuffer, 0, numMoreBytesStreamed); 
     numTotalBytesStreamed += numMoreBytesStreamed; 
} 
acceptorStream.Flush(); 
clientSocket.Close(); 

该解决方案可以工作,并且对于每秒数百个请求的连续加载非常稳定。