您可以尝试这样的事:
import asyncio
loop = asyncio.get_event_loop()
data_queue = asyncio.Queue(loop=loop)
receiving = True
def forward(data):
queue = process(queue, data)
if (len(queue) > 0):
blocking_send_from_queue()
return True
else:
return False
async def receive():
while receiving:
data = await non_blocking_recv()
await data_queue.put(data)
async def main():
receiver = loop.create_task(receive())
while True:
data = await data_queue.get()
if not await loop.run_in_executor(None, forward, data):
global receiving
receiving = False
data_queue.get_no_wait()
break
await receiver
loop.run_until_complete(main())
这样,non_blocking_recv()
和blocking_send_from_queue()
可以并行而不阻塞互相data_queue
运行,在缓冲的成本在内存data
块。您可以将队列大小设置为data_queue
来控制缓冲区大小。
此外,process()
和blocking_send_from_queue()
看起来像可以并行执行的方法。这取决于实际执行情况,我假设queue
是这里一个持久的threading.Queue
对象。因此forward
可以分成:(跳过关机代码)
def do_process(data):
process(queue, data)
return len(queue) > 0
def forward():
while True:
blocking_send_from_queue()
而且main
条改为:
async def main():
loop.create_task(receive())
loop.run_in_executor(None, forward)
while True:
data = await data_queue.get()
if not await loop.run_in_executor(None, do_process, data):
break
理想的是,至多一个non_blocking_recv
,一个process
和一个blocking_send_from_queue
可以并行运行,如果GIL在blocking_send_from_queue
内正确释放。
最后,如果GIL甚至在process
中发布,并且发送顺序不要求与接收顺序相同,甚至可以通过并行运行process
充分利用多核计算能力: (部分代码)
async def process_worker():
while True:
data = await data_queue.get()
loop.run_in_executor(None, process, queue, data)
async def main():
loop.create_task(receive())
loop.run_in_executor(None, forward)
for _ in range(CPU_CORE_NUM):
loop.create_task(process_worker())
# wait for all tasks here
这看起来很容易出现竞争状况。 – user2357112