我有一个pool_map
函数可以用来限制同时执行的函数的数量。如何使asyncio池可取消?
的想法是具有coroutine function接受被映射到的可能的参数的列表的单个参数,而且还包所有函数调用成旗语取得,于是只有有限数量的运行一次:
from typing import Callable, Awaitable, Iterable, Iterator
from asyncio import Semaphore
A = TypeVar('A')
V = TypeVar('V')
async def pool_map(
func: Callable[[A], Awaitable[V]],
arg_it: Iterable[A],
size: int=10
) -> Generator[Awaitable[V], None, None]:
"""
Maps an async function to iterables
ensuring that only some are executed at once.
"""
semaphore = Semaphore(size)
async def sub(arg):
async with semaphore:
return await func(arg)
return map(sub, arg_it)
为了举例,我修改了上面的代码并没有对它进行测试,但是我的变体运行良好。例如。你可以这样使用它:
from asyncio import get_event_loop, coroutine, as_completed
from contextlib import closing
URLS = [...]
async def run_all(awaitables):
for a in as_completed(awaitables):
result = await a
print('got result', result)
async def download(url): ...
if __name__ != '__main__':
pool = pool_map(download, URLS)
with closing(get_event_loop()) as loop:
loop.run_until_complete(run_all(pool))
但是如果在等待未来时抛出异常,就会出现问题。我无法看到如何取消所有计划或仍在运行的任务,也没有人正在等待信号量的获取。
有没有一个图书馆或这个我不知道的优雅建筑模块,还是我必须自己构建所有零件? (即Semaphore
可以访问它的服务生,一个as_finished
提供其运行的任务队列的访问,...)