2011-05-10 205 views
7

我有两个彼此对话的节点进程。我会叫他们[节点服务器][节点发件人][节点发送者]不断处理信息并通过TCP连接将消息写入到[节点服务器][节点服务器]然后写回状态消息。发送时Node.js TCP服务器输入缓冲区

[Node Server].socket.on("data", function(p_data) { 
    this.write("OK"); 

    // Do some work with p_data 
} 

这工作没有问题,P_DATA总是包含“测试消息”:中[节点服务器]

var message = "Test Message"; 
[Node Sender].connection.write(message); 

实例:[节点发送者]的

例在任何时间超过5毫秒。但是,如果我速度[节点发送者]写每毫秒,P_DATA偶尔像“测试MessageTest MessageTes”结束。

据我所知,[节点发件人]中的缓冲区可能比写入命令发送它的速度快。有没有办法在发送消息时强制一对一的比率,同时仍然保持异步?

我当然可以只需添加终止我的消息,只需填写一个缓冲区[节点服务器],但我想,以确保有没有什么明显的我失踪。

回答

10

不,你不会错过任何东西,是的,你需要添加某种形式的终止消息。

这里有两个基本问题:

  1. TCP协议是面向流的,而不是面向消息的;它对可能构成“信息”的内容没有内在的知识。

  2. 由Node.js的网库解雇的数据事件表示一些数据已经到达,但没有什么消息可以包含任何想法,也不能表明它已收到任何具体的数据。

因此,通过发送消息的速度超过Node可以处理它们,套接字recv缓冲区将填充多个“消息”。

一个典型的解决这个问题是增加行终止,如可在 https://github.com/baudehlo/Haraka/blob/master/connection.js在线路32-34上找到:

self.client.on('data', function (data) { 
    self.process_data(data); 
}); 

和线110-134:

Connection.prototype.process_data = function (data) { 
    if (this.disconnected) { 
    logger.logwarn("data after disconnect from " + this.remote_ip); 
    return; 
    } 

    this.current_data += data; 
    this._process_data(); 
}; 

Connection.prototype._process_data = function() { 
    var results; 
    while (results = line_regexp.exec(this.current_data)) { 
    var this_line = results[1]; 
    if (this.state === 'pause') { 
     this.early_talker = 1; 
     var self = this; 
     // If you talk early, we're going to give you a delay 
     setTimeout(function() { self._process_data() }, this.early_talker_delay); 
     break; 
    } 
    this.current_data = this.current_data.slice(this_line.length); 
    this.process_line(this_line); 
    } 
}; 
+0

感谢您的答案和示例。 – user747454 2011-05-10 23:13:52

7

您需要累积传入的缓冲区数据以获取完整的消息。请参考下面的例子。这个服务器需要4个字节的头部和消息体。头部是无符号整数,这意味着身体的总长度,身体是带有分隔符'|'的字符串数据。请注意,此“标题和消息”可能一次没有收到。所以我们需要累积传入的数据,直到获得完整的数据。还有可能一次收到多个“标题和消息”。关键是我们需要数据积累。

var SERVER_PORT = 8124; 
var TCP_DELIMITER = '|'; 
var packetHeaderLen = 4; // 32 bit integer --> 4 

var server = net.createServer(function(c) { 
    var accumulatingBuffer = new Buffer(0); 
    var totalPacketLen = -1; 
    var accumulatingLen = 0; 
    var recvedThisTimeLen= 0; 
    var remoteAddress = c.remoteAddress; 
    var address= c.address(); 
    var remotePort= c.remotePort; 
    var remoteIpPort = remoteAddress +":"+ remotePort; 

    console.log('-------------------------------'+remoteAddress); 
    console.log('remoteIpPort='+ remoteIpPort); 

    c.on('data', function(data) { 
     console.log('received data length :' + data.length); 
     console.log('data='+ data); 

     recvedThisTimeLen = data.length; 
     console.log('recvedThisTimeLen='+ recvedThisTimeLen); 

     //accumulate incoming data 
     var tmpBuffer = new Buffer(accumulatingLen + recvedThisTimeLen); 
     accumulatingBuffer.copy(tmpBuffer); 
     data.copy (tmpBuffer, accumulatingLen ); // offset for accumulating 
     accumulatingBuffer = tmpBuffer; 
     tmpBuffer = null; 
     accumulatingLen += recvedThisTimeLen ; 
     console.log('accumulatingBuffer = ' + accumulatingBuffer ); 
     console.log('accumulatingLen =' + accumulatingLen); 

     if(recvedThisTimeLen < packetHeaderLen) { 
      console.log('need to get more data(less than header-length received) -> wait..'); 
      return; 
     } else if(recvedThisTimeLen == packetHeaderLen) { 
      console.log('need to get more data(only header-info is available) -> wait..'); 
      return; 
     } else { 
      console.log('before-totalPacketLen=' + totalPacketLen); 
      //a packet info is available.. 
      if(totalPacketLen < 0) { 
       totalPacketLen = accumulatingBuffer.readUInt32BE(0) ; 
       console.log('totalPacketLen=' + totalPacketLen); 
      } 
     }  

     //while=> 
     //in case of the accumulatingBuffer has multiple 'header and message'. 
     while(accumulatingLen >= totalPacketLen + packetHeaderLen) { 
      console.log('accumulatingBuffer= ' + accumulatingBuffer); 

      var aPacketBufExceptHeader = new Buffer(totalPacketLen ); // a whole packet is available... 
      console.log('aPacketBufExceptHeader len= ' + aPacketBufExceptHeader.length); 
      accumulatingBuffer.copy(aPacketBufExceptHeader, 0, packetHeaderLen, accumulatingBuffer.length); // 

      //////////////////////////////////////////////////////////////////// 
      //process one packet data 
      var stringData = aPacketBufExceptHeader.toString(); 
      var usage = stringData.substring(0,stringData.indexOf(TCP_DELIMITER)); 
      console.log("usage: " + usage); 
      //call handler 
      (serverFunctions [usage])(c, remoteIpPort, stringData.substring(1+stringData.indexOf(TCP_DELIMITER))); 
      //////////////////////////////////////////////////////////////////// 

      //rebuild buffer 
      var newBufRebuild = new Buffer(accumulatingBuffer.length); 
      newBufRebuild.fill(); 
      accumulatingBuffer.copy(newBufRebuild, 0, totalPacketLen + packetHeaderLen, accumulatingBuffer.length ); 

      //init  
      accumulatingLen -= (totalPacketLen +4) ; 
      accumulatingBuffer = newBufRebuild; 
      newBufRebuild = null; 
      totalPacketLen = -1; 
      console.log('Init: accumulatingBuffer= ' + accumulatingBuffer); 
      console.log('  accumulatingLen = ' + accumulatingLen); 

      if(accumulatingLen <= packetHeaderLen) { 
       return; 
      } else { 
       totalPacketLen = accumulatingBuffer.readUInt32BE(0) ; 
       console.log('totalPacketLen=' + totalPacketLen); 
      }  
     } 
    }); 

    ... 
}); 

请参考下面的整个例子。

https://github.com/jeremyko/nodeChatServer

希望这有助于。