1
如何使用asyncio在两个协程之间实现管道,一个从流中读取,另一个写入其中?如何在两个Python asyncio协程之间使用读/写流?
假设我们有这个现有的代码,两个简单的脚本。一个产生到stdout:
# produce.py
import asyncio
import random
import sys
async def produce(stdout):
for i in range(10000):
await asyncio.sleep(random.randint(0, 3))
print(i, file=stdout, flush=True)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(produce(sys.stdout))
loop.close()
以及从标准输入读取其他:
# consume.py
async def consume(loop, stdin):
reader = asyncio.StreamReader(loop=loop)
reader_protocol = asyncio.StreamReaderProtocol(reader)
await loop.connect_read_pipe(lambda: reader_protocol, stdin)
while True:
line = await reader.readline()
if not line:
break
print(int(line) ** 2)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(consume(loop, sys.stdin))
loop.close()
很明显,因为我们的两件可以在命令行单独运行,我们可以使用subprocess
模块壳管(produce | consume
)。
但是我们希望在Python中实现等价的Unix管道,即连接这两个现有函数的流。
像这样的东西是行不通的:
pipe = io.BytesIO()
await asyncio.gather(produce(pipe),
consume(loop, pipe))
如果这两个功能将操作发电机,我们可以写这样的事情(蟒蛇3.6):
async def produce():
for i in range(10000):
await asyncio.sleep(random.randint(0, 3))
yield str(i)
async def consume(generator):
async for value in generator:
print(int(value) ** 2)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(consume(produce()))
loop.close()
有一些地方asyncio API会允许这样吗?
谢谢!
这假定流数据是换行符分隔的文本。 –
在我们的情况下,这是总是得到一个完整的路线,但我想你可以使用'read(1024)',而不是如果你想要一个二进制流。 – Natim