2017-08-16 27 views
1

如何使用asyncio在两个协程之间实现管道,一个从流中读取,另一个写入其中?如何在两个Python asyncio协程之间使用读/写流?

假设我们有这个现有的代码,两个简单的脚本。一个产生到stdout:

# produce.py 

import asyncio 
import random 
import sys 

async def produce(stdout): 
    for i in range(10000): 
     await asyncio.sleep(random.randint(0, 3)) 
     print(i, file=stdout, flush=True) 

if __name__ == "__main__": 
    loop = asyncio.get_event_loop() 
    loop.run_until_complete(produce(sys.stdout)) 
    loop.close() 

以及从标准输入读取其他:

# consume.py 

async def consume(loop, stdin): 
    reader = asyncio.StreamReader(loop=loop) 
    reader_protocol = asyncio.StreamReaderProtocol(reader) 
    await loop.connect_read_pipe(lambda: reader_protocol, stdin) 

    while True: 
     line = await reader.readline() 
     if not line: 
      break 
     print(int(line) ** 2) 

if __name__ == "__main__": 
    loop = asyncio.get_event_loop() 
    loop.run_until_complete(consume(loop, sys.stdin)) 
    loop.close() 

很明显,因为我们的两件可以在命令行单独运行,我们可以使用subprocess模块壳管(produce | consume)。

但是我们希望在Python中实现等价的Unix管道,即连接这两个现有函数的流。

像这样的东西是行不通的:

pipe = io.BytesIO() 

await asyncio.gather(produce(pipe), 
        consume(loop, pipe)) 

如果这两个功能将操作发电机,我们可以写这样的事情(蟒蛇3.6):

async def produce(): 
    for i in range(10000): 
     await asyncio.sleep(random.randint(0, 3)) 
     yield str(i) 


async def consume(generator): 
    async for value in generator: 
     print(int(value) ** 2) 


if __name__ == "__main__": 
    loop = asyncio.get_event_loop() 
    loop.run_until_complete(consume(produce())) 
    loop.close() 

有一些地方asyncio API会允许这样吗?

谢谢!

回答

2

解决这个的办法,就是把你的当前功能为发电机,并写一些包装与UNIX管道揭露他们:

# wrapper.py 

import asyncio 
import random 
import sys 


async def produce(): 
    for i in range(10000): 
     await asyncio.sleep(random.randint(0, 3)) 
     yield str(i) 


async def consume(generator): 
    async for value in generator: 
     print(int(value) ** 2) 


async def system_out_generator(loop, stdout, generator): 
    async for line in generator: 
     print(line, file=stdout, flush=True) 


async def system_in_generator(loop, stdin): 
    reader = asyncio.StreamReader(loop=loop) 
    reader_protocol = asyncio.StreamReaderProtocol(reader) 
    await loop.connect_read_pipe(lambda: reader_protocol, stdin) 
    while True: 
     line = await reader.readline() 
     if not line: 
      break 
     yield line 


async def main(loop): 
    try: 
     if sys.argv[1] == "produce": 
      await system_out_generator(loop, sys.stdout, produce()) 
     elif sys.argv[1] == "consume": 
      await consume(system_in_generator(loop, sys.stdin)) 
    except IndexError: 
     await consume(produce()) 


if __name__ == "__main__": 
    loop = asyncio.get_event_loop() 
    loop.run_until_complete(main(loop)) 

您可以使用:

python wrapper.py # Python generators 

或:

python wrapper.py produce | python wrapper.py consume # System pipes 
+0

这假定流数据是换行符分隔的文本。 –

+0

在我们的情况下,这是总是得到一个完整的路线,但我想你可以使用'read(1024)',而不是如果你想要一个二进制流。 – Natim

相关问题