2017-06-30 62 views
6

两个aiohttp.web.Application()对象可以在相同的过程中运行,例如在不同的端口?多个aiohttp Application()在同一个进程中运行?

我看到一堆的aiohttp类似的代码示例:

from aiohttp import web 
app = web.Application() 
app.router.add_get('/foo', foo_view, name='foo') 
web.run_app(app, host='0.0.0.0', port=10000) 

我不知道是否有一些相当于在多个web.Applications()可以被配置为在同一时间运行。喜欢的东西:

from aiohttp import web 
app1 = web.Application() 
app1.router.add_get('/foo', foo_view, name='foo') 
app2 = web.Application() 
app2.router.add_get('/bar', bar_view, name='bar') 
# This is the wishful thinking code: 
web.configure_app(app1, host='0.0.0.0', port=10000) 
web.configure_app(app2, host='0.0.0.0', port=10001) 
web.run_apps() 

我使用的情况是,我有一个现有的Python Web框架,做这种事情,和我建立一个原型,在蟒蛇3.6与aiohttp类似。

我知道多个python服务器可以运行在nginx(另请参阅http://aiohttp.readthedocs.io/en/stable/deployment.html);那不是我所追求的。我想探索两个具有相同asyncio事件循环的aiohttp web服务器的可能性,它运行在同一个python进程中,在两个不同的端口上运行。

回答

5

是的,你可以 - 只需编写一些包装重新实施run_app

这是一个简单的例子。 run_app的所有特定于应用程序的部分均被移至专用类AppWrapperMultiApp仅负责初始化所有配置的应用程序,继续运行循环并清理。

import asyncio 
from aiohttp import web 


class AppWrapper: 

    def __init__(self, aioapp, port, loop): 
     self.port = port 
     self.aioapp = aioapp 
     self.loop = loop 
     self.uris = [] 
     self.servers = [] 

    def initialize(self): 
     self.loop.run_until_complete(self.aioapp.startup()) 
     handler = self.aioapp.make_handler(loop=self.loop) 

     server_creations, self.uris = web._make_server_creators(
      handler, loop=self.loop, ssl_context=None, 
      host=None, port=self.port, path=None, sock=None, 
      backlog=128) 

     self.servers = self.loop.run_until_complete(
      asyncio.gather(*server_creations, loop=self.loop) 
     ) 

    def shutdown(self): 
     server_closures = [] 
     for srv in self.servers: 
      srv.close() 
      server_closures.append(srv.wait_closed()) 
     self.loop.run_until_complete(
      asyncio.gather(*server_closures, loop=self.loop)) 

     self.loop.run_until_complete(self.aioapp.shutdown()) 

    def cleanup(self): 
     self.loop.run_until_complete(self.aioapp.cleanup()) 

    def show_info(self): 
     print("======== Running on {} ========\n".format(', '.join(self.uris))) 


class MultiApp:  

    def __init__(self, loop=None): 
     self._apps = [] 
     self.user_supplied_loop = loop is not None 
     if loop is None: 
      self.loop = asyncio.get_event_loop() 
     else: 
      self.loop = loop 

    def configure_app(self, app, port): 
     app._set_loop(self.loop) 
     self._apps.append(
      AppWrapper(app, port, self.loop) 
     ) 

    def run_all(self): 
     try: 
      for app in self._apps: 
       app.initialize() 
      try: 
       for app in self._apps: 
        app.show_info() 
       print("(Press CTRL+C to quit)") 
       self.loop.run_forever() 
      except KeyboardInterrupt: # pragma: no cover 
       pass 
      finally: 
       for app in self._apps: 
        app.shutdown() 
     finally: 
      for app in self._apps: 
       app.cleanup() 

     if not self.user_supplied_loop: 
      self.loop.close() 

注意:注意使用的内部aiohttp的方法,也可能会受到变化的。

现在让我们使用它:

from aiohttp import web 

async def handle1(request): 
    return web.Response(text='SERVER 1') 


async def handle2(request): 
    return web.Response(text='SERVER 2') 

app1 = web.Application() 
app1.router.add_get('/', handle1) 

app2 = web.Application() 
app2.router.add_get('/', handle2) 

ma = MultiApp() 
ma.configure_app(app1, port=8081) 
ma.configure_app(app2, port=8071) 
ma.run_all() 

作为一个侧面说明,再想想为什么你需要这个。在几乎所有情况下,解耦都是更好的选择。在同一个过程中设置许多端点使它们相互依赖。有一种情况出现在我的脑海里,并且具有“良好”的推理,即内部统计/调试端点。

1

虽然上述答案已被接受,这里是另一种方法:

创建test.py:

from aiohttp import web 
import asyncio 
import sys 

@asyncio.coroutine 
def status1(request): 
    return web.json_response('App1 OK') 

@asyncio.coroutine 
def status2(request): 
    return web.json_response('App2 OK') 

def start(): 
    try: 
     loop = asyncio.get_event_loop() 

     # App1 
     app1 = web.Application() 
     app1.router.add_get('/status', status1) 
     handler1 = app1.make_handler() 
     coroutine1 = loop.create_server(handler1, '0.0.0.0', 8081) 
     server1 = loop.run_until_complete(coroutine1) 
     address1, port1 = server1.sockets[0].getsockname() 
     print('App1 started on http://{}:{}'.format(address1, port1)) 

     # App2 
     app2 = web.Application() 
     app2.router.add_get('/status', status2) 
     handler2 = app2.make_handler() 
     coroutine2 = loop.create_server(handler2, '0.0.0.0', 8082) 
     server2 = loop.run_until_complete(coroutine2) 
     address2, port2 = server2.sockets[0].getsockname() 
     print('App2 started on http://{}:{}'.format(address2, port2)) 

     try: 
      loop.run_forever() 
     except KeyboardInterrupt: 
      pass 
     finally: 
      server1.close() 
      loop.run_until_complete(app1.shutdown()) 
      loop.run_until_complete(handler1.shutdown(60.0)) 
      loop.run_until_complete(handler1.finish_connections(1.0)) 
      loop.run_until_complete(app1.cleanup()) 

      server2.close() 
      loop.run_until_complete(app2.shutdown()) 
      loop.run_until_complete(handler2.shutdown(60.0)) 
      loop.run_until_complete(handler2.finish_connections(1.0)) 
      loop.run_until_complete(app2.cleanup()) 

     loop.close() 
    except Exception as e: 
     sys.stderr.write('Error: ' + format(str(e)) + "\n") 
     sys.exit(1) 

if __name__ == '__main__': 
    start() 

在终端,打开两个标签。在一个选项卡,运行

python test.py 

在其他选项卡,运行

curl -X GET http://localhost:8081/status 
curl -X GET http://localhost:8082/status 

你会得到响应

"App1 OK" 
"App2 OK" 
相关问题