2013-11-22 57 views
2

我正在为我的大学项目做一个客户端 - 服务器项目, 我们必须分配登录到客户端。socket编程问题 - python

客户端系统将每2秒钟请求其状态(以检查客户端是锁定还是解锁)。服务器将接受客户端请求并将客户端状态回复给系统。

但问题是服务器线程没有响应客户端请求。

客户端线程:

def checkPort(): 
    while True: 
    try: 
     s = socket.socket()  
     s.connect((host, port))  
     s.send('pc1')     # send PC name to the server 
     status = s.recv(1024)   # receive the status from the server 

     if status == "unlock": 
      disableIntrrupts()  # enable all the functions of system 
     else: 
      enableInterrupts()  # enable all the functions of system 

     time.sleep(5)     
     s.close() 

    except Exception: 
     pass 

服务器线程:

def check_port(): 
    while True: 
     try: 
     print "hello loop is repeating" 
     conn, addr = s.accept() 
     data = conn.recv(1024) 

     if exit_on_click == 1: 
      break 
     if (any(sublist[0] == data for sublist in available_sys)): 
      print "locked" 
      conn.send("lock") 
     elif (any(sublist[0] == data for sublist in occupied_sys)): 
      conn.send("unlock") 
      print "unlocked" 
     else: 
      print "added to gui for first time" 
      available_sys.append([data,addr[0],nameText,usnText,branchText]) 
      availSysList.insert('end',data) 
     except Exception: 
      pass 

但我的问题是服务器线程不执行超过2次, 因此,它无法接受客户端的请求超过一次。 我们不能使用单个服务器套接字来处理多个客户端套接字吗? 如何处理来自服务器的多个客户端请求?

感谢您的任何帮助!

回答

2

这里是实现线程服务器的例子。它功能完善,并具有使用SSL的好处。此外,我使用线程事件对象在将接收到的数据存储在数据库中之后发出另一个类对象的信号。

请注意,_sni和_cams_db是纯粹属于我自己的附加模块。如果你想看到_sni模块(为pyOpenSSL提供SNI支持),请告诉我。

接下来是什么,是camsbot.py的一个片段,还有很多远远超出了这个问题的范围。我建立的是一个集中的信息中继系统。它侦听tcp/2345并接受SSL连接。每个连接都将消息传递到系统中。短暂的连接将连接,传递消息,并断开连接。连接后,长时间连接会传递大量消息。消息存储在数据库中,threading.Event()对象(连接到数据库类)设置为通知机器人轮询数据库以查找新消息并中继它们。

下面的例子示出了

  • 如何设置的螺纹TCP服务器
  • 如何从收听信息传递给接受处理程序如配置数据和等

除了,这个例子还显示

  • 如何使用SSL套接字
  • 如何做一些基本的证书验证
  • 如何干净地包裹并从TCP套接字解开SSL
  • 如何插座,而不是在使用poll()选择()

db.pending是_cams_db.py中的threading.Event()对象

在主进程中,我们使用db.pending.wait()启动另一个等待挂起对象的线程。这使得该线程一直等到另一个线程执行db.pending.set()。一旦设置,我们的等待线程立即醒来并继续工作。当我们等待线程完成,它调用db.pending.clear(),并返回到循环的开始,并开始与db.pending.wait()从camsbot.py

while True: 
    db.pending.wait() 
    # after waking up, do code. for example, we wait for incoming messages to 
    # be stored in the database. the threaded server will call db.pending.set() 
    # which will wake us up. we'll poll the DB for new messages, relay them, clear 
    # our event flag and go back to waiting. 
    # ... 
    db.pending.clear() 

片段再次等待:

import sys, os, sys, time, datetime, threading, select, logging, logging.handlers 
import configparser, traceback, re, socket, hashlib 
# local .py 
sys.path.append('/var/vse/python') 
import _util, _webby, _sni, _cams_db, _cams_threaded_server, _cams_bot 
# ... 

def start_courier(config): 
    # default values 
    host = '::' 
    port = 2345 

    configp = config['configp'] 

    host = configp.get('main', 'relay msp hostport') 

    # require ipv6 addresses be specified in [xx:xx:xx] notation, therefore 
    # it is safe to look for :nnnn at the end 
    if ':' in host and not host.endswith(']'): 
     port = host.split(':')[-1] 
     try: 
      port = int(port, 10) 
     except: 
      port = 2345 
     host = host.split(':')[:-1][0] 

    server = _cams_threaded_server.ThreadedTCPServer((host, port), _cams_threaded_server.ThreadedTCPRequestHandler, config) 
    t = threading.Thread(target=server.serve_forever, name='courier') 
    t.start() 

_cams_threaded_server.py:

import socket, socketserver, select, datetime, time, threading 
import sys, struct 

from OpenSSL.SSL import SSLv23_METHOD, SSLv3_METHOD, TLSv1_METHOD, OP_NO_SSLv2 
from OpenSSL.SSL import VERIFY_NONE, VERIFY_PEER, VERIFY_FAIL_IF_NO_PEER_CERT, Context, Connection 
from OpenSSL.SSL import FILETYPE_PEM 
from OpenSSL.SSL import WantWriteError, WantReadError, WantX509LookupError, ZeroReturnError, SysCallError 
from OpenSSL.crypto import load_certificate 
from OpenSSL import SSL 

# see note at beginning of answer 
import _sni, _cams_db 

class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): 
    def __init__(self, server_address, HandlerClass, config): 
     socketserver.BaseServer.__init__(self, server_address, HandlerClass) 
     self.address_family = socket.AF_INET6 
     self.connected  = [] 
     self.logger   = config['logger'] 
     self.config   = config 

     self.socket = socket.socket(self.address_family, self.socket_type) 
     self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 

     sc = Context(TLSv1_METHOD) 
     sc.set_verify(VERIFY_PEER|VERIFY_FAIL_IF_NO_PEER_CERT, _sni.verify_cb) 
     sc.set_tlsext_servername_callback(_sni.pick_certificate) 
     self.sc = sc 

     self.server_bind() 
     self.server_activate() 

class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler): 
    def handle(self): 
     config   = self.server.config 
     logger   = self.server.logger 
     connected  = self.server.connected 
     sc    = self.server.sc 

     try: 
      self.peer_hostname = socket.gethostbyaddr(socket.gethostbyname(self.request.getpeername()[0]))[0] 
     except: 
      self.peer_hostname = '!'+self.request.getpeername()[0] 

     logger.info('peer: {}'.format(self.peer_hostname)) 

     ssl_s = Connection(sc, self.request) 
     ssl_s.set_accept_state() 

     try: 
      ssl_s.do_handshake() 
     except: 
      t,v,tb = sys.exc_info() 
      logger.warn('handshake failed {}'.format(v)) 

     ssl_s.setblocking(True) 
     self.ssl_s = ssl_s 

     try: 
      peercert = ssl_s.get_peer_certificate() 
     except: 
      peercert = False 
      t,v,tb = sys.exc_info() 
      logger.warn('SSL get peer cert failed: {}'.format(v))     

     if not peercert: 
      logger.warn('No peer certificate') 

     else: 
      acl = config['configp']['main'].get('client cn acl', '').split(' ') 
      cert_subject = peercert.get_subject().CN 
      logger.info('Looking for {} in acl: {}'.format(cert_subject,acl)) 

      if cert_subject in acl: 
       logger.info('{} is permitted'.format(cert_subject)) 
      else: 
       logger.warn('''client CN not approved''') 

     # it's ok to block here, every socket has its own thread 
     ssl_s.setblocking(True) 

     self.db = config['db'] 
     msgcount = 0 

     p = select.poll() 
     # don't want writable, just readable 

     p.register(self.request, select.POLLIN|select.POLLPRI|select.POLLERR|select.POLLHUP|select.POLLNVAL) 
     peername = ssl_s.getpeername() 

     x = peername[0] 
     if x.startswith('::ffff:'): 
      x = x[7:] 
     peer_ip = x 

     try: 
      host = socket.gethostbyaddr(x)[0] 
     except: 
      host = peer_ip 

     logger.info('{}/{}:{} connected'.format(host, peer_ip, peername[1])) 
     connected.append([host, peername[1]]) 

     if peercert: 
      threading.current_thread().setName('{}/port={}/CN={}'.format(host, peername[1], peercert.get_subject().CN)) 
     else: 
      threading.current_thread().setName('{}/port={}'.format(host, peername[1])) 

     sockclosed = False 
     while not sockclosed: 
      keepreading = True 

      #logger.debug('starting 30 second timeout for poll') 
      pe = p.poll(30.0) 
      if not pe: 
       # empty list means poll timeout 
       # for SSL sockets it means WTF. we get an EAGAIN like return even if the socket is blocking 
       continue 

      logger.debug('poll indicates: {}'.format(pe)) 

      #define SSL_NOTHING  1 
      #define SSL_WRITING  2 
      #define SSL_READING  3 
      #define SSL_X509_LOOKUP 4 

      while keepreading and not sockclosed: 
       data,sockclosed,keepreading = self._read_ssl_data(2, head=True) 
       if sockclosed or not keepreading: 
        time.sleep(5) 
        continue 

       plen = struct.unpack('H', data)[0] 
       data,sockclosed,keepreading = self._read_ssl_data(plen) 

       if sockclosed or not keepreading: 
        time.sleep(5) 
        continue 

       # send thank you, ignore any errors since we appear to have gotten 
       # the message 
       try: 
        self.ssl_s.sendall(b'ty') 
       except: 
        pass 

       # extract the timestamp 

       message_ts = data[0:8] 
       msgtype = chr(data[8]) 
       message = data[9:].decode() 

       message_ts = struct.unpack('d', message_ts)[0] 
       message_ts = datetime.datetime.utcfromtimestamp(message_ts).replace(tzinfo=datetime.timezone.utc) 

       self.db.enqueue(config['group'], peer_ip, msgtype, message, message_ts) 
       self.db.pending.set() 

     # we're recommended to use the return socket object for any future operations rather than the original 
     try: 
      s = ssl_s.unwrap() 
      s.close() 
     except: 
      pass 

     connected.remove([host, peername[1]]) 
     t_name = threading.current_thread().getName() 
     logger.debug('disconnect: {}'.format(t_name)) 

    def _read_ssl_data(self, wantsize=16384, head=False): 
     _w = ['WANT_NOTHING','WANT_READ','WANT_WRITE','WANT_X509_LOOKUP'] 

     logger = self.server.logger 

     data  = b'' 
     sockclosed = False 
     keepreading = True 

     while len(data) < wantsize and keepreading and not sockclosed: 
      rlen = wantsize - len(data) 
      try: 
       w,wr = self.ssl_s.want(),self.ssl_s.want_read() 

       #logger.debug(' want({}) want_read({})'.format(_w[w],wr)) 
       x = self.ssl_s.recv(rlen) 
       #logger.debug(' recv(): {}'.format(x)) 

       if not (x or len(x)): 
        raise ZeroReturnError 

       data += x 
       if not (len(x) == len(data) == wantsize): 
        logger.info(' read={}, len(data)={}, plen={}'.format(len(x),len(data),wantsize)) 

      except WantReadError: 
       # poll(), when ready, read more 
       keepreading = False 
       logger.info(' got WantReadError') 
       continue 

      except WantWriteError: 
       # poll(), when ready, write more 
       keepreading = False 
       logger.info(' got WantWriteError') 
       continue 

      except ZeroReturnError: 
       # socket got closed, a '0' bytes read also means the same thing 
       keepreading = False 
       sockclosed = True 
       logger.info(' ZRE, socket closed normally') 
       continue 

      except SysCallError: 
       keepreading = False 
       sockclosed = True 

       t,v,tb = sys.exc_info() 

       if v.args[0] == -1: # normal EOF 
        logger.info(' EOF found, keepreading=False') 

       else: 
        logger.info('{} terminated session abruptly while reading plen'.format(self.peer_hostname)) 
        logger.info('t: {}'.format(t)) 
        logger.info('v: {}'.format(v)) 

       continue 

      except: 
       t,v,tb = sys.exc_info() 
       logger.warning(' fucked? {}'.format(v)) 
       raise 

     if not head and not len(data) == wantsize: 
      logger.warn(' short read {} of {}'.format(len(data), wantsize)) 

     return data,sockclosed,keepreading 
+0

是你当前的服务器实现实际上是线程化的吗?因为您没有正确处理对数据的线程访问,所以您正在失去跟踪您正在使用的套接字连接的情况。为了简单起见,我们假设已经建立了10个连接。看看你的代码,10个套接字连接中的哪一个是你的.recv()或.send()操作的?与并发访问,你不知道,因为没有任何区别。一个线程不会运行单独的代码副本,它会运行_same_代码。 – FirefighterBlu3

+0

所以如果你在socket.accept()有两个接近背对背的连接,它们都会到达socket.read(),但你不知道哪一个先到达,所以第一个read()会吞下所有的数据而第二个可能正在等待数据不会到达,因为你已经失去了你所拥有的连接的区别。你的代码只知道_one_,但你正在走过你的道路两次。 – FirefighterBlu3

+0

但我有一个double,在while循环结束时,我正在关闭套接字。这样我可以在循环的下一次迭代中收到新的连接。但是冲突如何得到提升? – Raj

6

它,因为你的服务器,将阻止等待这条线上

conn, addr = s.accept() 

这是因为像.accept.read呼叫阻塞持有该进程调用

你需要考虑的一个新的连接替代设计,你在哪里。

  • 有每个连接一个进程(这种想法是愚蠢的)每个连接
  • 一个线程(这种想法是不是第一,但仍然主要愚蠢多少有点愚蠢)
  • 有无阻塞设计,允许多个客户端并在不阻碍执行的情况下读/写。

为了实现第一,看multiprocessing,二是threading第三个是稍微复杂围绕让你的头,但将产生最佳效果,去库在Python事件驱动的代码是twisted但还有其他的像

还有更多,我没有在这里列出。

+0

感谢您的回复,我认为不可能处理与不同线程的每个连接。但我可以修改我的上面的代码以满足要求(bcz我已经实现了线程)? – Raj

+1

我的答案已经包含建议,看看扭曲或gevent或另一个异步框架 –

+0

如何处理与多处理的东西? – Raj

0

你可能想看看0MQ和concurrent.futures。 0MQ在库中有一个Tornado事件循环,它降低了套接字编程的复杂性。 concurrent.futures是线程或多处理的高级接口。

1

让我们开始线程TCP服务器裸露的骨头。

class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): 
    def __init__(self, server_address, HandlerClass): 
     socketserver.BaseServer.__init__(self, server_address, HandlerClass) 
     self.address_family = socket.AF_INET 
     self.socket = socket.socket(self.address_family, self.socket_type) 
     self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 
     self.server_bind() 
     self.server_activate() 

class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler): 
    def handle(self): 
     # self.request is your accepted socket, do all your .read() and .wirte() on it 
     s = self.request 

     request = s.read(1024) 
     # decide locked or unlocked. this example arbitrarily writes back 'locked' 
     s.write('locked') 

     # we're done, close the socket and exit with a default return of None 
     s.close() 

OK,开始你的这个线程服务器在你的main()函数:

server = threading.ThreadedTCPServer(('127.0.0.1', 1234), ThreadedTCPRequestHandler) 
t = threading.Thread(target=server.serve_forever, name='optional_name') 
t.start() 

现在你可以让线程模块处理并发的语义,而不是担心。

+0

非常感谢你的答案。但是需要导入哪些附加软件包?我有导入套接字模块,但我仍然收到错误 – Raj