2017-08-05 88 views
0
while True: 
     if (len(queue) > 0): 
      blocking_send_from_queue() 
      there_is_something_to_send = False 
     else: 
      break 
     data = non_blocking_recv() 
     queue = process(queue, data) 

这是来自客户端的代码,它应该处理查询。python的asyncio可以做些什么来改善这种功能?

要完成查询,可能需要根据收到的数据创建更多的子查询,并将它们发送到服务器,然后处理子查询的结果等。

我可以使用asyncio创建更高效​​或扩展的结果吗?

+0

这看起来很容易出现竞争状况。 – user2357112

回答

0

您可以尝试这样的事:

import asyncio 

loop = asyncio.get_event_loop() 
data_queue = asyncio.Queue(loop=loop) 
receiving = True 

def forward(data): 
    queue = process(queue, data) 
    if (len(queue) > 0): 
     blocking_send_from_queue() 
     return True 
    else: 
     return False 

async def receive(): 
    while receiving: 
     data = await non_blocking_recv() 
     await data_queue.put(data) 

async def main(): 
    receiver = loop.create_task(receive()) 
    while True: 
     data = await data_queue.get() 
     if not await loop.run_in_executor(None, forward, data): 
      global receiving 
      receiving = False 
      data_queue.get_no_wait() 
      break 
    await receiver 

loop.run_until_complete(main()) 

这样,non_blocking_recv()blocking_send_from_queue()可以并行而不阻塞互相data_queue运行,在缓冲的成本在内存data块。您可以将队列大小设置为data_queue来控制缓冲区大小。

此外,process()blocking_send_from_queue()看起来像可以并行执行的方法。这取决于实际执行情况,我假设queue是这里一个持久的threading.Queue对象。因此forward可以分成:(跳过关机代码)

def do_process(data): 
    process(queue, data) 
    return len(queue) > 0 

def forward(): 
    while True: 
     blocking_send_from_queue() 

而且main条改为:

async def main(): 
    loop.create_task(receive()) 
    loop.run_in_executor(None, forward) 
    while True: 
     data = await data_queue.get() 
     if not await loop.run_in_executor(None, do_process, data): 
      break 

理想的是,至多一个non_blocking_recv,一个process和一个blocking_send_from_queue可以并行运行,如果GIL在blocking_send_from_queue内正确释放。

最后,如果GIL甚至在process中发布,并且发送顺序不要求与接收顺序相同,甚至可以通过并行运行process充分利用多核计算能力: (部分代码)

async def process_worker(): 
    while True: 
     data = await data_queue.get() 
     loop.run_in_executor(None, process, queue, data) 

async def main(): 
    loop.create_task(receive()) 
    loop.run_in_executor(None, forward) 
    for _ in range(CPU_CORE_NUM): 
     loop.create_task(process_worker()) 
    # wait for all tasks here 
相关问题