2013-11-01 91 views
1

我们有一个异步单线程扭曲/ python进程。有一个X服务器在端口X上侦听。还有另一个服务器Y服务器在端口Y上侦听。Y服务器也是X服务器的客户端(执行Y请求包括将请求传递给X服务器) 。扭曲延迟:客户端连接到服务器,异步单线程

它们都应该在一个线程中异步执行。它应该像下面这样:

  • Y型服务器收到请求
  • Y型服务器创建一个X客户端:打开到X服务器的连接并发送内部请求
  • 同时 (!!!)X服务器接受内部请求
  • X服务器完成它的工作并返回内部响应
  • X客户端(属于Y服务器)接受内部响应并将其作为Y-响应

我试图实现这样的事情,但我失败了,可能是因为没有使用deferreds。据我所知推迟,他们的工作是将上面的序列分成更小的块,以便这些部分可以同时由X和Y完成。

我需要的是理解这种沟通应该如何工作的方案。一个伪代码应该做...


下面是我的失败尝试的简短代码。

有主业务类,它由协议/工厂类:

class PyCached(protocol.Protocol): 
    def __init__(self, factory, verbose): 
     self.factory = factory 
     self.verbose = verbose 
    def dataReceived(self, data): 
     log.msg(data) 
     if self.verbose: 
      print 'received: %s' % (data,) 
     request = json.loads(data) 
     if self.verbose: 
      print 'request: %s' % (request,) 
     command = "handle_%s" % (request.pop('command'),) 
     if self.verbose: 
      print 'command: %s\n' % (command,) 
     result = getattr(self.factory, command)(**request) 
     self.transport.write(result + "\n") 

class PyCachedFactory(protocol.Factory): 
    def __init__(self, verbose=False): 
     self.clear() 
     self.start_time = datetime.now() 
     self.verbose = verbose 
     log.msg('starts on %s, verbose=%s' % (self.start_time, self.verbose)) 

    # many many more commands performed by factory 

还有一个HTTP访问服务器:

from twisted.web.resource import Resource 
from twisted.python import log 
from twisted.web.server import Site 
from client import PyCachedClient 

class PyCachedCommand(Resource): 
    isLeaf = True 

    def getServiceClient(self): 
     client = PyCachedClient() 
     client.connect(*self.service_address) 
     return client 

    def render_GET(self, request): 
     ''' 
     Renders service status as plain text. 
     ''' 
     log.msg('GET') 
     request.setHeader('Content-Type', 'text/plain') 
     try: 
      client = self.getServiceClient() 
      status = client.status() 
      client.close() 
      return "PyCached is up since %0.2f seconds" % (status['uptime'],) 
     except: 
      return "PyCached is down." 

    def render_POST(self, request): 
     ''' 
     Executes pycached request ad returns the response. 
     ''' 
     log.msg('POST %s' % (str(request.args))) 
     client = self.getServiceClient() 
     kwargs = {k: v[0] for k,v in request.args.iteritems()} 
     command_name = kwargs.pop('command') 
     command = getattr(client, command_name) 
     result = str(command(**kwargs)) 
     client.close() 
     request.setHeader('Content-Type', 'text/plain') 
     return result 

class PyCachedSite(Site): 
    ''' 
    Performs all operations for PyCached HTTP access. 
    ''' 
    def __init__(self, service_address, **kwargs): 
     resource = PyCachedCommand() 
     resource.service_address = service_address 
     Site.__init__(self, resource, **kwargs) 

在HTTP使用主服务的客户端,其是用简单的套接字来实现的 - 而且这可能是问题所在,因为这些客户套接字调用被阻塞了:

class PyCachedClient(object): 
    def __init__(self): 
     self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

    def connect(self, host, port): 
     try: 
      self.s.connect((host, port)) 
     except socket.error: 
      raise RuntimeError('Something went wrong with PyCached.') 

    def close(self): 
     self.s.close() 

    def _receive(self): 
     received = self.s.recv(1024) 
     decoded = json.loads(received.rstrip('\n')) 
     return decoded['value'] if decoded.has_key('value') else None 

    def _send(self, command, options={}): 
     request = {'command': command} 
     request.update(options) 
     self.s.sendall(json.dumps(request)) 

    def version(self): 
     self._send('version') 
     return self._receive() 

    # many many more commands similar to version 

最后,一切由twistd来/ TAC文件运行 - 因此它驻留在单个线程中:

from twisted.application import internet, service 
from server.service import PyCachedFactory 
from server.http import PyCachedSite 

application = service.Application('pycached') 
# pycached core service 
pycachedService = internet.TCPServer(8001, PyCachedFactory()) 
pycachedService.setServiceParent(application) 
# pycached http access 
addr = ('localhost', 8001) 
pycachedHttp = internet.TCPServer(8002, PyCachedSite(addr)) 
pycachedHttp.setServiceParent(application) 

当我telnet 8001(主服务),例如{"command":"version"},一切都很好。但是当我询问http时,一切都会阻塞,因为客户端套接字被阻塞,主服务永远无法响应。

+0

请发布一个“失败的尝试”的最小例子,以便我们可以评估它的问题。 – Glyph

+0

@Glyph对不起,花了这么长时间,但很难切断代码。所以在这里 - 我试图提供一个最小的不工作代码。 – ducin

+1

不知怎的,我碰到让 - 保罗卡尔德龙的答案http://stackoverflow.com/a/3275944/769384,我认为这可能会导致适当的解决方案。但我想不出从http服务器级别访问reactor的好方法... – ducin

回答

1

延迟只是一个管理回调的工具。 X客户端必须提供某种方式来在收到结果时添加回调,并且您应该从此回调中发送Y响应。而延迟只是一个实现细节。