2015-09-23 37 views
0

我有500多个单元连接到我的服务器并转储他们的数据。到目前为止,我一直在使用PHP脚本来充当套接字监听器,但是随着负载的增加和PHP无法跟上,我需要使用多线程。我对Python很陌生,决定将它用于我的新平台,在过去的几天中,我一直在努力尝试并尝试了很多例子,但都无济于事。通过我的搜索,我遇到了一些试图回答这个问题的问题,但都没有。我会附上我的代码。Python多线程套接字侦听器错误,线程未发布

问题:当单元连接到服务器并且服务器接受它时,服务器创建一个新的线程来处理连接,当单元断开线程保持打开和活动的连接时,问题出现,总线程计数增长,并且这与系统限制相关:“打开文件的数量”,我可以增加这个限制,但是这只能使它成为一个定时炸弹,它并不能解决这个问题。

请帮忙。

#! /usr/bin/python 

import multiprocessing 
import socket 
import sys 
import pprint 
import datetime 
import MySQLdb 
import time 
import datetime 
import re 
import select 
import resource 
import threading 

max_connections = 1024 
max_connections_set = max_connections 
resource.setrlimit(resource.RLIMIT_NOFILE, (max_connections_set, max_connections_set)) 



#the incomming port 
the_global_port = xxxx #(any port) 

#display id 
the_global_id = "UNIT TYPE" 




class ConnectionObject(object): 
    the_id = None 

    the_socket = None 
    the_socket_address = None 
    the_socket_address_ip = None 
    the_socket_address_port = None 

    the_server = None 

    the_process = None 
    the_process_id = None 
    the_process_name = None 

    the_imei = None 
    identifier = "" 

    # The class "constructor" - It's actually an initializer 
    def __init__(self, in_process_nr, in_process , in_socket , in_socket_address, in_server): 
    self.the_id = in_process_nr 
    self.the_process = in_process 
    self.the_process_id = self.the_process.exitcode 
    self.the_process_name = self.the_process.name 


    self.the_socket = in_socket 
    self.the_socket_address = in_socket_address 
    self.the_socket_address_ip = self.the_socket_address[0] 
    self.the_socket_address_port = self.the_socket_address[1] 


    self.the_server = in_server 

    self.identifier = str(self.the_id) + " " + str(self.the_process_name) + " " + str(self.the_socket_address_ip) + " " + str(self.the_socket_address_port) + " " 
    #end def init 
#end def class 

def processData(the_connection_object , the_data): 
    def mysql_open_connection_inside(): 
    try: 
     the_conn = MySQLdb.connect(host= "127.0.0.1", 
          user="user", 
          passwd="password", 
          db="mydb") 
    except MySQLdb.Error, e: 
     print "Error %d: %s" % (e.args[0], e.args[1]) 
     time.sleep(30) 
     try: 
     the_conn = MySQLdb.connect(host= "127.0.0.1", 
          user="user", 
          passwd="password", 
          db="mydb") 
     except MySQLdb.Error, e: 
     print "Error %d: %s" % (e.args[0], e.args[1]) 
     print "Unexpected error:", sys.exc_info()[0] 
     raise 
     sys.exit(0) 
     #end time 2 
    #end try except 

    return the_conn 
    #end def mysql_open_connection      



    conn = mysql_open_connection_inside() 
    x = conn.cursor() 

    add_rawdata = ("INSERT INTO RawData" 
       "(ID,RawData,Type) " 
       "VALUES (%s, %s, %s)") 

    data_raw = ('123456', 'ABCD','') 



    records_inserted = 0 


    the_connection_object.the_imei = "" 


    #global clients 
    imei = "" 
    try: 
    thedata = "" 
    thedata = " ".join("{:02x}".format(ord(c)) for c in the_data) 

    record_to_save = ' '.join(thedata) 


    seqtoreply = "" 
    seqtoreply = "OK" 

    #reply part 
    if (seqtoreply != ""): #reply to unit 
     try: 
     the_connection_object.the_socket.sendall(seqtoreply) 
     #echoout(the_connection_object.identifier+"We Replyed With : " + seqtoreply) 
     except: 
     echoout(the_connection_object.identifier+"Send Reply Error : " + str(sys.exc_info()[1])) 
     #end except 
    #end of if 


    the_id = "some generated id" 
    data_raw = (the_id, werk_data, 'UNIT') 
    try: 
     x.execute(add_rawdata, data_raw) 
     conn.commit() 
     echoout(the_connection_object.identifier+"Raw Data Saved.") 
    except: 
     conn.rollback() 
     echoout(the_connection_object.identifier+" Raw Data NOT Saved : " + str(sys.exc_info()[1])) 
    #end of data save insert 
    #echoout("=============================") 
    endme = 1 
    echoout("") 
    conn.close() 
    #end try 
    except: 
    conn.close() 
    echoout(the_connection_object.identifier+"Error : " + str(sys.exc_info()[1])) 
    #end try except 
#end def handel function 



def handle_p(processnr, server, connection, address): 
    this_connection = ConnectionObject(processnr,multiprocessing.current_process(), connection, address, server) 
    thisprocess = multiprocessing.current_process() 
    this_connection.the_id = "" 
    the_address = this_connection.the_socket_address_ip 
    the_port = this_connection.the_socket_address_port 

    try: 
    echoout("New connection from : "+str(the_address)+" on port "+str(the_port)) 
    close_the_socket = False 
    while True: 
     #--------------------- recive part ------------------------------------------------- 
     data = connection.recv(512) 
     thedata = "" 
     thedata = " ".join("{:02x}".format(ord(c)) for c in data) 
     if ((thedata == "") or (thedata == " ") or (data == False)): 
     echoout("Socket Closed Remotely : No Data") 
     close_the_socket = True 
     break 
     #end - if data blank 
     else : 
     processData(this_connection, data) 
     #end there is data 
     echoout("=============================") 
    #end if while true 
    #end try 
    except: 
    print "handling request, Error : " + str(sys.exc_info()[1]) 
    close_the_socket = True 
    connection.close() 
    finally: 
    close_the_socket = True 
    echoout("Closing socket") 
    connection.close() 
    #end try finally 
#end def handel function 

def mysql_update(update_statement, update_values): 
    conn_update = MySQLdb.connect(host= "127.0.0.1", 
        user="user", 
        passwd="password", 
        db="mydb") 
    x_update = conn_update.cursor(MySQLdb.cursors.DictCursor) 

    rawdata_data = (update_statement) 
    data_rawdata = (update_values) 
    allupdateok = False 
    #end if there is more 
    try: 
    x_update.execute(rawdata_data, data_rawdata) 
    conn_update.commit() 
    allupdateok = True 
    conn_update.close() 
    except: 
    conn_update.rollback() 
    allupdateok = False 
    conn_update.close() 
    print "Unexpected error:", sys.exc_info()[0] 
    raise 
    #end of data save insert 
    if (allupdateok == False): 
    echoout("Update Raw Data Table Error") 
    #end if update 
    return allupdateok 
#end def mysqlupdate 



def echoout(thestring): 
    datestring = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") 
    if (thestring != ""): 
    outstring = datestring + " " + thestring 
    print outstring 
    else : 
    outstring = thestring 
    print outstring 
#end - def echoout 




class Server(object): 
    threads = [] 
    all_threads = [] 
    high_proc = "" 
    def __init__(self, hostname, port): 
    self.hostname = hostname 
    self.port = port 

    def start(self): 
    echoout("Listening for conncetions") 
    self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 
    self.socket.bind((self.hostname, self.port)) 
    self.socket.listen(10) 
    process_number = 1 

    inputs = [self.socket] 
    while True: 
     inready, outready, excready = select.select(inputs, [], [], 30); 
     for s in inready: 
     if s == self.socket: 
      conn, address = self.socket.accept() 
      high_processname = "" 
      echoout("Got a connection...") 
      process = threading.Thread(target=handle_p, args=(process_number,self, conn, address)) 
      high_processname = process.name 
      self.high_proc = high_processname 
      process.daemon = True 
      process.start() 
      self.all_threads.append((process,conn)) 
      ts = time.time() 
      st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') 
      self.threads.append((process_number,conn,st,0,process)) 
      process_number = process_number + 1 

      print "ACTIVE Threads = " + str(threading.activeCount()) 

      the_total_count = 0 
      dead_count = 0 
      alive_count = 0 
      for the_thread in self.all_threads : 
      if (the_thread[0].is_alive() == True): 
       alive_count = alive_count + 1 
      else : 
       dead_count = dead_count + 1 
       the_thread[1].close() 
       the_thread[0].join(0.3) 

       self.all_threads.pop(the_total_count) 
      #end if alive else 
      the_total_count = the_total_count + 1 
      #end for threads 

      print "LIVE Threads = " + str(alive_count) 
      print "DEAD Threads = " + str(dead_count) 
      print "TOTAL Threads = " + str(the_total_count) 
      print "" 


     #end if s = socke, new connection 
     #end for loop 
    #end while truw 
    self.socket.close() 
    #end def start 

#main process part 
if __name__ == "__main__": 

    start_ip = "0.0.0.0" 
    start_port = the_global_port 

    #start server 
    server = Server(start_ip, start_port) 
    try: 
    print "Listening on " , start_port 
    server.start() 
    except: 
    print "unexpected, Error : " + str(sys.exc_info()[1]) 
    finally: 
    print "shutting down" 

    active_clients = 0 
    for process in multiprocessing.active_children(): 
     try: 
     active_clients = active_clients + 1 
     process.terminate() 
     #process.join() 
     except: 
     print "Process not killed = " + str(sys.exc_info()[1]) 
     #end try except 
    #close mysql connection 
    print "Active clients = " + str(active_clients) 
    #end try finally 
    server.socket.close() 
    server.threads = [] 
    server = None 
    print "All done." 
#end def main 
+0

'当单元断开连接,线程保持打开并处于活动状态并且总线程数增加时,问题出现'为什么不关闭线程? http://stackoverflow.com/a/325528/2382792 –

回答

0

首先,这是愚蠢的使用线程的时候可以有500多个连接的客户端,你应该去异步 - 看看gevent例如用于 一个非常好的图书馆,或至少使用select(见Python文档)。

然后,你的代码以关闭插座handle_p看起来不错,确实当 的recv()电话回来了一个空字符串这意味着远程 端断开,因此你打破while,罚款。

但是,它看起来像遥控关闭了连接,但它不是 在您身边检测到(recv()不返回)。最好是 然后有一种心跳,知道什么时候你可以关闭 连接。

+0

谢谢,我现在试图gevent,仍然得到同样的问题,我还包括对死亡连接recv()的检测,仍然没有运气 – Pierrie

+0

请显示新的代码;如何检测recv上的死亡连接?有超时? – mguijarr