2016-07-28 60 views
1

假设我们有两个通常以嵌套方式一起使用的异步上下文管理器,但只有第二个的结果通常在主体中使用。例如,如果我们发现自己这个输入很多:如何在Python中嵌套异步上下文管理器3

async with context_mgr_1() as cm1: 
    async with cm2.context_mgr_2() as cm2: 
     ...do something with cm2... 

我们怎样才能创造出巢这些上下文管理一个上下文管理器,以便我们可以这样做:

async with context_mgr_2() as cm2: 
    ...do something with cm2... 

contextlib.nested用于为非异步上下文管理器完成此操作,但是我在asyncio中找不到这样的帮助器。

+0

现在没办法。 但您可以在contextlib.ExitStack设计 –

回答

1

在内部,我已经开始使用它作为异步管理器来同步和异步上下文管理器。它允许AsyncExitStack风格的推式语义以及简单地包装多个管理器。

这是相当不错的测试,但我不发布测试或支持此计划,让您自担风险使用...

import asyncio 
import logging 
import sys 

from functools import wraps 

class AsyncContextManagerChain(object): 

    def __init__(self, *managers): 
     self.managers = managers 
     self.stack = [] 
     self.values = [] 

    async def push(self, manager): 
     try: 
      if hasattr(manager, '__aenter__'): 
       value = await manager.__aenter__() 
      else: 
       value = manager.__enter__() 

      self.stack.append(manager) 
      self.values.append(value) 
      return value 
     except: 
      # if we encounter an exception somewhere along our enters, 
      # we'll stop adding to the stack, and pop everything we've 
      # added so far, to simulate what would happen when an inner 
      # block raised an exception. 
      swallow = await self.__aexit__(*sys.exc_info()) 
      if not swallow: 
       raise 

    async def __aenter__(self): 
     value = None 

     for manager in self.managers: 
      value = await self.push(manager) 

     return value 

    async def __aexit__(self, exc_type, exc, tb): 
     excChanged = False 
     swallow = False # default value 
     while self.stack: 
      # no matter what the outcome, we want to attempt to call __aexit__ on 
      # all context managers 
      try: 
       swallow = await self._pop(exc_type, exc, tb) 
       if swallow: 
        # if we swallow an exception on an inner cm, outer cms would 
        # not receive it at all... 
        exc_type = None 
        exc = None 
        tb = None 
      except: 
       # if we encounter an exception while exiting, that is the 
       # new execption we send upward 
       excChanged = True 
       (exc_type, exc, tb) = sys.exc_info() 
       swallow = False 

     if exc is None: 
      # when we make it to the end, if exc is None, it was swallowed 
      # somewhere along the line, and we've exited everything successfully, 
      # so tell python to swallow the exception for real 
      return True 
     elif excChanged: 
      # if the exception has been changed, we need to raise it here 
      # because otherwise python will just raise the original exception 
      if not swallow: 
       raise exc 
     else: 
      # we have the original exception still, we just let python handle it... 
      return swallow 

    async def _pop(self, exc_type, exc, tb): 
    manager = self.stack.pop() 
    if hasattr(manager, '__aexit__'): 
     return await manager.__aexit__(exc_type, exc, tb) 
    else: 
     return manager.__exit__(exc_type, exc, tb) 
2

凯文的回答不遵循contextlib.ExitStack impl在3.5.2中,所以我已经开始创建一个基于python 3.5.2的官方impl。如果我发现任何问题,我会更新impl。

GitHub的要点链接:https://gist.github.com/thehesiod/b8442ed50e27a23524435a22f10c04a0

+0

后实现AsyncExitStack我已根据编辑建议更新了impl,并且还支持__enter __/__ exit__ – amohr