2017-01-16 29 views
1

我有一个pool_map函数可以用来限制同时执行的函数的数量。如何使asyncio池可取消?

的想法是具有coroutine function接受被映射到的可能的参数的列表的单个参数,而且还包所有函数调用成旗语取得,于是只有有限数量的运行一次:

from typing import Callable, Awaitable, Iterable, Iterator 
from asyncio import Semaphore 

A = TypeVar('A') 
V = TypeVar('V') 

async def pool_map(
    func: Callable[[A], Awaitable[V]], 
    arg_it: Iterable[A], 
    size: int=10 
) -> Generator[Awaitable[V], None, None]: 
    """ 
    Maps an async function to iterables 
    ensuring that only some are executed at once. 
    """ 
    semaphore = Semaphore(size) 

    async def sub(arg): 
     async with semaphore: 
      return await func(arg) 

    return map(sub, arg_it) 

为了举例,我修改了上面的代码并没有对它进行测试,但是我的变体运行良好。例如。你可以这样使用它:

from asyncio import get_event_loop, coroutine, as_completed 
from contextlib import closing 

URLS = [...] 

async def run_all(awaitables): 
    for a in as_completed(awaitables): 
     result = await a 
     print('got result', result) 

async def download(url): ... 


if __name__ != '__main__': 
    pool = pool_map(download, URLS) 

    with closing(get_event_loop()) as loop: 
     loop.run_until_complete(run_all(pool)) 

但是如果在等待未来时抛出异常,就会出现问题。我无法看到如何取消所有计划或仍在运行的任务,也没有人正在等待信号量的获取。

有没有一个图书馆或这个我不知道的优雅建筑模块,还是我必须自己构建所有零件? (即Semaphore可以访问它的服务生,一个as_finished提供其运行的任务队列的访问,...)

回答

1

使用ensure_future获得Task而不是协程:

import asyncio 
from contextlib import closing 


def pool_map(func, args, size=10): 
    """ 
    Maps an async function to iterables 
    ensuring that only some are executed at once. 
    """ 
    semaphore = asyncio.Semaphore(size) 

    async def sub(arg): 
     async with semaphore: 
      return await func(arg) 

    tasks = [asyncio.ensure_future(sub(x)) for x in args] 

    return tasks 


async def f(n): 
    print(">>> start", n) 

    if n == 7: 
     raise Exception("boom!") 

    await asyncio.sleep(n/10) 

    print("<<< end", n) 
    return n 


async def run_all(tasks): 
    exc = None 
    for a in asyncio.as_completed(tasks): 
     try: 
      result = await a 
      print('=== result', result) 
     except asyncio.CancelledError as e: 
      print("!!! cancel", e) 
     except Exception as e: 
      print("Exception in task, cancelling!") 
      for t in tasks: 
       t.cancel() 
      exc = e 
    if exc: 
     raise exc 


pool = pool_map(f, range(1, 20), 3) 

with closing(asyncio.get_event_loop()) as loop: 
    loop.run_until_complete(run_all(pool)) 
1

这里有一个天真的解决方案,基于这样的事实,cancel是,如果任务已经完成了无操作:

async def run_all(awaitables): 
    futures = [asyncio.ensure_future(a) for a in awaitables] 
    try: 
     for fut in as_completed(futures): 
      result = await fut 
      print('got result', result) 
    except: 
     for future in futures: 
      future.cancel() 
     await asyncio.wait(futures)