2017-09-08 33 views
2

选择一样的功能我遇到一个行为同时努力(Python的3.6)与asyncio实现select()样的功能我不明白:实现与ASYNCIO

  • 我有2个队列( & )其中消息由异步生产者Queue.put
  • 我有一个异步消费者(所述选择器),该轮询队列和需要可用的第一消息(即。使用asyncio.wait_for的类似select()的功能。

轮询是这样的:

poll = list(_.get() for _ in queues) 
while True: 
    done, pending = await asyncio.wait(tuple(poll), return_when=asyncio.FIRST_COMPLETED) 

然后,迭代的done期货,我有一个新的Queue.get替换poll相应的条目:

for f in done: 
     try: 
      i = poll.index(f._coro) 
      ... 
      poll[i] = queues[i].get() 

现在,奇我所经历的行为是选择有效,但done期货.result()中的一些返回None,并且通过队列发送的消息都将丢失:

PUT B ('B', 0)/0 
GET B ('B', 0)/0 
PUT A ('A', 0)/0 
GET A None/0   << ('A', 0) is never received 
PUT A ('A', 1)/0 
GET A ('A', 1)/0 
PUT B ('B', 1)/0 
GET B None/0 
PUT A ('A', 2)/0 
GET A None/0 
PUT B ('B', 2)/0 
GET B None/0 

下面的代码

#!/usr/bin/env python3 
import asyncio, random 

async def queue_generator(name, queue, speed=1): 
    counter = 0 
    while True: 
     t = (random.random() + 0.5) * speed 
     await asyncio.sleep(t) 
     m = (name, counter) 
     print ("PUT {0} {1}/{2:d}".format(name, m, queue.qsize())) 
     queue.put_nowait(m) 
     counter += 1 

async def select(*queues): 
    poll = list(_.get() for _ in queues) 
    while True: 
     # That's the select()-like functionalit 
     done, pending = await asyncio.wait(tuple(poll), return_when=asyncio.FIRST_COMPLETED) 
     for f in done: 
      i = poll.index(f._coro) 
      # That's where sometimes v is None 
      v = f.result() 
      print ("GET {0} {1}/{2}".format("AB"[i], v, queues[i].qsize())) 
      poll[i] = queues[i].get() 
     await asyncio.sleep(0.5) 

if __name__ == "__main__": 
    loop  = asyncio.get_event_loop() 
    queue_a = asyncio.Queue(10) 
    queue_b = asyncio.Queue(10) 
    tasks = (
     loop.create_task(queue_generator("A", queue_a, 3)), 
     loop.create_task(queue_generator("B", queue_b, 3)), 
     loop.create_task(select(queue_a, queue_b)) 
    ) 
    loop.run_until_complete(asyncio.wait(tasks)) 
    loop.close() 

由于这种情况,只要存在,目前有一个消息队列的变化,我认为问题在于,这样做与pending期货。实际上,加入

for f in pending: 
     f.cancel() 

解决了这个问题,但我想尽可能多地重复使用这些期货。我想这个问题来自于这样一个事实,即asyncio.wait_for默默地将发电机列表转换为任务。

+1

'get'是一个协程。你需要使用'await queue.get()'或'queue.get_nowait()'。 – dirn

+1

相关:[awaitchannel](https://github.com/pothos/awaitchannel) – Vincent

+0

@Vincent不是'await asyncio.wait(tuple(poll)...)'正在做那个吗?通过按顺序执行'await queue.get()',我会阻止整个过程,如果我们说第一个队列没有消息但第二个队列被填充。 –

回答

1

我没有深入挖掘到会发生什么,但移动poll内循环产生似乎解决的事情:

async def select(*queues): 
    while True:  
     poll = list(_.get() for _ in queues) # HERE  
     # That's the select()-like functionalit 
     done, pending = await asyncio.wait(tuple(poll), return_when=asyncio.FIRST_COMPLETED) 

poll是协同程序的列表。通常每个独特的协同程序都应该等待一次。否则可能会导致奇怪的事情。

+0

它确实解决了这个问题,并且解释是因为'Queue.get()'是一个协程,所以不能等待两次,这会发生在第一次循环之后的挂起协程。无论如何,'select()'通常在循环内完成,而不是在之前完成。谢谢! –