您应该能够安全地将asyncio
和multiprocessing
组合起来,而不会有太多麻烦,尽管您不应该直接使用multiprocessing
。 asyncio
(以及任何其他基于事件循环的异步框架)的主要罪名是阻止事件循环。如果您尝试直接使用multiprocessing
,则只要您阻止等待子进程,就会阻止事件循环。显然,这很糟糕。
避免这种情况的最简单方法是使用BaseEventLoop.run_in_executor
执行concurrent.futures.ProcessPoolExecutor
中的函数。 ProcessPoolExecutor
是一个使用multiprocessing.Process
实现的进程池,但asyncio
内置了对其中执行功能的支持,而不阻塞事件循环。这里有一个简单的例子:
import time
import asyncio
from concurrent.futures import ProcessPoolExecutor
def blocking_func(x):
time.sleep(x) # Pretend this is expensive calculations
return x * 5
@asyncio.coroutine
def main():
#pool = multiprocessing.Pool()
#out = pool.apply(blocking_func, args=(10,)) # This blocks the event loop.
executor = ProcessPoolExecutor()
out = yield from loop.run_in_executor(executor, blocking_func, 10) # This does not
print(out)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
对于大多数情况下,这是单独的功能是不够好。如果你发现自己需要其他结构从multiprocessing
,像Queue
,Event
,Manager
等,有一个叫aioprocessing
第三方库(全面披露:我写的),提供所有multiprocessing
数据结构的asyncio
兼容版本。下面是一个演示示例:
import time
import asyncio
import aioprocessing
import multiprocessing
def func(queue, event, lock, items):
with lock:
event.set()
for item in items:
time.sleep(3)
queue.put(item+5)
queue.close()
@asyncio.coroutine
def example(queue, event, lock):
l = [1,2,3,4,5]
p = aioprocessing.AioProcess(target=func, args=(queue, event, lock, l))
p.start()
while True:
result = yield from queue.coro_get()
if result is None:
break
print("Got result {}".format(result))
yield from p.coro_join()
@asyncio.coroutine
def example2(queue, event, lock):
yield from event.coro_wait()
with (yield from lock):
yield from queue.coro_put(78)
yield from queue.coro_put(None) # Shut down the worker
if __name__ == "__main__":
loop = asyncio.get_event_loop()
queue = aioprocessing.AioQueue()
lock = aioprocessing.AioLock()
event = aioprocessing.AioEvent()
tasks = [
asyncio.async(example(queue, event, lock)),
asyncio.async(example2(queue, event, lock)),
]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
io事件循环在主进程中,如果我想通过子进程中的套接字发送/ recv,我该怎么做?我发现我不能简单地调用main_proc_loop.ensure_future(send_socket_data ...),因为它们处于不同的过程中?实现它的最好方法是什么?通过队列? – jon