选择一样的功能我遇到一个行为同时努力(Python的3.6)与asyncio
实现select()
样的功能我不明白:实现与ASYNCIO
- 我有2个队列(甲 & 乙)其中消息由异步生产者
Queue.put
- 我有一个异步消费者(所述选择器),该轮询队列和需要可用的第一消息(即。使用
asyncio.wait_for
的类似select()
的功能。
轮询是这样的:
poll = list(_.get() for _ in queues)
while True:
done, pending = await asyncio.wait(tuple(poll), return_when=asyncio.FIRST_COMPLETED)
然后,迭代的done
期货,我有一个新的Queue.get
替换poll
相应的条目:
for f in done:
try:
i = poll.index(f._coro)
...
poll[i] = queues[i].get()
现在,奇我所经历的行为是选择有效,但done
期货.result()
中的一些返回None
,并且通过队列发送的消息都将丢失:
PUT B ('B', 0)/0
GET B ('B', 0)/0
PUT A ('A', 0)/0
GET A None/0 << ('A', 0) is never received
PUT A ('A', 1)/0
GET A ('A', 1)/0
PUT B ('B', 1)/0
GET B None/0
PUT A ('A', 2)/0
GET A None/0
PUT B ('B', 2)/0
GET B None/0
下面的代码
#!/usr/bin/env python3
import asyncio, random
async def queue_generator(name, queue, speed=1):
counter = 0
while True:
t = (random.random() + 0.5) * speed
await asyncio.sleep(t)
m = (name, counter)
print ("PUT {0} {1}/{2:d}".format(name, m, queue.qsize()))
queue.put_nowait(m)
counter += 1
async def select(*queues):
poll = list(_.get() for _ in queues)
while True:
# That's the select()-like functionalit
done, pending = await asyncio.wait(tuple(poll), return_when=asyncio.FIRST_COMPLETED)
for f in done:
i = poll.index(f._coro)
# That's where sometimes v is None
v = f.result()
print ("GET {0} {1}/{2}".format("AB"[i], v, queues[i].qsize()))
poll[i] = queues[i].get()
await asyncio.sleep(0.5)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
queue_a = asyncio.Queue(10)
queue_b = asyncio.Queue(10)
tasks = (
loop.create_task(queue_generator("A", queue_a, 3)),
loop.create_task(queue_generator("B", queue_b, 3)),
loop.create_task(select(queue_a, queue_b))
)
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
由于这种情况,只要存在,目前有一个消息队列的变化,我认为问题在于,这样做与pending
期货。实际上,加入
for f in pending:
f.cancel()
解决了这个问题,但我想尽可能多地重复使用这些期货。我想这个问题来自于这样一个事实,即asyncio.wait_for
默默地将发电机列表转换为任务。
'get'是一个协程。你需要使用'await queue.get()'或'queue.get_nowait()'。 – dirn
相关:[awaitchannel](https://github.com/pothos/awaitchannel) – Vincent
@Vincent不是'await asyncio.wait(tuple(poll)...)'正在做那个吗?通过按顺序执行'await queue.get()',我会阻止整个过程,如果我们说第一个队列没有消息但第二个队列被填充。 –