我目前使用这个LIB进行压力测试,我已经建立了一个卡夫卡服务器:https://github.com/dsully/pykafka如何强制用python打开的套接字关闭?
import kafka
import time
def test_kafka_server(n=1):
for i in range(0,n):
producer = kafka.producer.Producer('test',host='10.137.8.192')
message = kafka.message.Message(str(time.time()))
producer.send(message)
producer.disconnect()
def main():
test_kafka_server(100000)
if __name__ == '__main__':
main()
什么只是最终发生的是,我最终超载我自己的本地机器。
我得到错误10055,根据谷歌意味着“Windows已经用尽了TCP/IP套接字缓冲区,因为太多的连接一次打开。”根据netstat,producer.disconnect()没有关闭套接字,而是将其置于TIME_WAIT
状态。
IPython的调试器指向该行:
C:\Python27\lib\socket.pyc in meth(name, self, *args)
222 proto = property(lambda self: self._sock.proto, doc="the socket protocol")
223
--> 224 def meth(name,self,*args):
225 return getattr(self._sock,name)(*args)
226
的罪魁祸首,但这似乎然后进入比我舒适与更低层级的事情搞乱。
我已经搜查,发现了这个Python socket doesn't close connection properly其中建议这样做:
setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
所以,我重建了pykafka LIB使用该选项在io.py文件:
def connect(self):
""" Connect to the Kafka server. """
global socket
self.socket = socket.socket()
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.connect((self.host, self.port))
,我仍然得到同样的错误。
我是不是把setsockopt线放在正确的位置?还有什么我可以尝试吗?
我没有看到它的解决办法:http://www.serverframework.com/asynchronousevents/2011/01/time-wait-and-its-design-implications-for-protocols-and-scalable-servers html的 – Will