35

由于几乎每个人都知道他们什么时候第一次看到Python中的线程,所以GIL让那些真正想要并行处理的人们生活悲惨 - 或者至少给它一个机会。什么样的问题(如果有的话)会结合asyncio和多处理?

我目前正在寻找像反应堆模式一样的东西。实际上,我希望在一个线程上监听传入的套接字连接,并且当有人试图连接时,接受该连接并将其传递给另一个线程(如处理)。

我不(确定)我可能会面对什么样的负载。我知道目前在收到消息时设置了2MB的上限。理论上我们可以每秒获得数千次(尽管我不知道实际上我们是否看到过类似的东西)。花费在处理邮件上的时间量不是,重要的是,虽然明显更快会更好。

我正在研究Reactor模式,并开发了一个小例子,使用multiprocessing库(至少在测试中)似乎工作得很好。然而,现在/很快我们将有asyncio库可用,这将为我处理事件循环。

有什么可以咬我asynciomultiprocessing

回答

40

您应该能够安全地将asynciomultiprocessing组合起来,而不会有太多麻烦,尽管您不应该直接使用multiprocessingasyncio(以及任何其他基于事件循环的异步框架)的主要罪名是阻止事件循环。如果您尝试直接使用multiprocessing,则只要您阻止等待子进程,就会阻止事件循环。显然,这很糟糕。

避免这种情况的最简单方法是使用BaseEventLoop.run_in_executor执行concurrent.futures.ProcessPoolExecutor中的函数。 ProcessPoolExecutor是一个使用multiprocessing.Process实现的进程池,但asyncio内置了对其中执行功能的支持,而不阻塞事件循环。这里有一个简单的例子:

import time 
import asyncio 
from concurrent.futures import ProcessPoolExecutor 

def blocking_func(x): 
    time.sleep(x) # Pretend this is expensive calculations 
    return x * 5 

@asyncio.coroutine 
def main(): 
    #pool = multiprocessing.Pool() 
    #out = pool.apply(blocking_func, args=(10,)) # This blocks the event loop. 
    executor = ProcessPoolExecutor() 
    out = yield from loop.run_in_executor(executor, blocking_func, 10) # This does not 
    print(out) 

if __name__ == "__main__": 
    loop = asyncio.get_event_loop() 
    loop.run_until_complete(main()) 

对于大多数情况下,这是单独的功能是不够好。如果你发现自己需要其他结构从multiprocessing,像QueueEventManager等,有一个叫aioprocessing第三方库(全面披露:我写的),提供所有multiprocessing数据结构的asyncio兼容版本。下面是一个演示示例:

import time 
import asyncio 
import aioprocessing 
import multiprocessing 

def func(queue, event, lock, items): 
    with lock: 
     event.set() 
     for item in items: 
      time.sleep(3) 
      queue.put(item+5) 
    queue.close() 

@asyncio.coroutine 
def example(queue, event, lock): 
    l = [1,2,3,4,5] 
    p = aioprocessing.AioProcess(target=func, args=(queue, event, lock, l)) 
    p.start() 
    while True: 
     result = yield from queue.coro_get() 
     if result is None: 
      break 
     print("Got result {}".format(result)) 
    yield from p.coro_join() 

@asyncio.coroutine 
def example2(queue, event, lock): 
    yield from event.coro_wait() 
    with (yield from lock): 
     yield from queue.coro_put(78) 
     yield from queue.coro_put(None) # Shut down the worker 

if __name__ == "__main__": 
    loop = asyncio.get_event_loop() 
    queue = aioprocessing.AioQueue() 
    lock = aioprocessing.AioLock() 
    event = aioprocessing.AioEvent() 
    tasks = [ 
     asyncio.async(example(queue, event, lock)), 
     asyncio.async(example2(queue, event, lock)), 
    ] 
    loop.run_until_complete(asyncio.wait(tasks)) 
    loop.close() 
+0

io事件循环在主进程中,如果我想通过子进程中的套接字发送/ recv,我该怎么做?我发现我不能简单地调用main_proc_loop.ensure_future(send_socket_data ...),因为它们处于不同的过程中?实现它的最好方法是什么?通过队列? – jon

3

是的,有很多位可能(或可能不会)咬你。

  • 当你运行类似asyncio的东西时,它期望在一个线程或进程上运行。这不(本身)并行处理工作。你不知何故必须在一个线程/进程中离开IO操作(特别是套接字上的IO操作)的同时分配工作。
  • 尽管将不同处理程序过程的个别连接转移的想法很好,但实现起来很困难。第一个障碍是您需要一种方法将连接从asyncio中取出,而不必关闭它。下一个障碍是,除非您使用C扩展的平台特定(可能是Linux)代码,否则您不能简单地将文件描述符发送到不同的进程。
  • 请注意,multiprocessing模块已知可创建多个用于通信的线程。大多数情况下,当您使用通信结构(例如Queue s)时,会生成一个线程。不幸的是,这些线程并非完全不可见。例如,他们可能无法完全拆除(当您打算终止您的程序时),但根据其数量,资源使用情况可能会显着。

如果你真的打算处理个别过程中的个人关系,我建议检查不同的方法。例如,您可以将套接字置于侦听模式,然后同时接受来自多个工作进程的并行连接。一旦工作人员完成了处理请求,它可以接受下一个连接,因此与为每个连接分派流程相比,您仍然使用更少的资源。 Spamassassin和Apache(mpm prefork)可以使用这个工作模型。根据您的使用情况,它可能会更容易,更强大。具体而言,您可以在完成一定数量的请求后使您的工作人员死亡,并通过主进程重新生成,从而消除内存泄漏的很多负面影响。

+0

我想我的问题有点含糊 - 当我提到我会将它发送给线程时,我的意思是说它们是单独的事件循环。 –