2012-11-08 42 views
3

我目前使用这个LIB进行压力测试,我已经建立了一个卡夫卡服务器:https://github.com/dsully/pykafka如何强制用python打开的套接字关闭?

import kafka 
import time 

def test_kafka_server(n=1): 
    for i in range(0,n): 
     producer = kafka.producer.Producer('test',host='10.137.8.192') 
     message = kafka.message.Message(str(time.time())) 
     producer.send(message) 
     producer.disconnect() 

def main(): 
    test_kafka_server(100000) 

if __name__ == '__main__': 
    main() 

什么只是最终发生的是,我最终超载我自己的本地机器。

我得到错误10055,根据谷歌意味着“Windows已经用尽了TCP/IP套接字缓冲区,因为太多的连接一次打开。”根据netstat,producer.disconnect()没有关闭套接字,而是将其置于TIME_WAIT状态。

IPython的调试器指向该行:

C:\Python27\lib\socket.pyc in meth(name, self, *args) 
    222  proto = property(lambda self: self._sock.proto, doc="the socket protocol") 
    223 
--> 224 def meth(name,self,*args): 
    225  return getattr(self._sock,name)(*args) 
    226 

的罪魁祸首,但这似乎然后进入比我舒适与更低层级的事情搞乱。

我已经搜查,发现了这个Python socket doesn't close connection properly其中建议这样做:

setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 

所以,我重建了pykafka LIB使用该选项在io.py文件:

def connect(self): 
    """ Connect to the Kafka server. """ 
    global socket 
    self.socket = socket.socket() 
    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 
    self.socket.connect((self.host, self.port)) 

,我仍然得到同样的错误。

我是不是把setsockopt线放在正确的位置?还有什么我可以尝试吗?

+1

我没有看到它的解决办法:http://www.serverframework.com/asynchronousevents/2011/01/time-wait-and-its-design-implications-for-protocols-and-scalable-servers html的 – Will

回答

5

您所描述的是套接字级别的正常TCP行为。当用户级程序关闭套接字时,内核不会立即释放套接字。它进入TIME_WAIT状态:

TIME-WAIT(无论是服务器或客户端)表示等待足够 时间通过,以确保远程TCP接收的 其连接终止请求的确认。 [根据RFC 793,连接 可以停留在TIME-WAIT中最多四分钟,称为MSL (最大段寿命)。

因此,套接字已关闭。 socket.SO_REUSEADDR用于侦听器(服务器),不影响客户端连接。那么,绑定套接字时真的很习惯。