2017-05-18 70 views
0

阅读this answer,我跑过了asyncio.tasks.as_completed。我不明白这个功能是如何工作的。它被记录为一个非异步例程,它按照它们完成的顺序返回期货。 它创建一个与事件循环相关联的队列,为每个将来添加一个完成回调,然后试图从队列中获取与期货一样多的项目。asyncio.as_completed如何工作

代码的核心是如下:

def _on_completion(f): 
     if not todo: 
      return # _on_timeout() was here first. 
     todo.remove(f) 
     done.put_nowait(f) 
     if not todo and timeout_handle is not None: 
      timeout_handle.cancel() 

    @coroutine 
    def _wait_for_one(): 
     f = yield from done.get() 
     if f is None: 
      # Dummy value from _on_timeout(). 
      raise futures.TimeoutError 
     return f.result() # May raise f.exception(). 

    for f in todo: 
     f.add_done_callback(_on_completion) 
    if todo and timeout is not None: 
     timeout_handle = loop.call_later(timeout, _on_timeout) 
    for _ in range(len(todo)): 
     yield _wait_for_one() 

我想理解这段代码是如何工作的。我最大的疑问是:

  • 循环在哪里实际运行。我没有看到任何对loop.run_until_cobmplete或loop.run_forever的调用。那么循环如何进展?

  • 方法文档说该方法返回期货。那你可以把它在as_completed(期货)类似

    为F: 结果=产量是F

我无法调和,对在_wait_for_one返回f.result线。记录的调用约定是否正确?如果是这样,那么收益率从哪里来?

回答

1

您复制的代码缺少标题部分,这非常重要。

# This is *not* a @coroutine! It is just an iterator (yielding Futures). 
def as_completed(fs, *, loop=None, timeout=None): 
    """Return an iterator whose values are coroutines. 

    When waiting for the yielded coroutines you'll get the results (or 
    exceptions!) of the original Futures (or coroutines), in the order 
    in which and as soon as they complete. 

    This differs from PEP 3148; the proper way to use this is: 

     for f in as_completed(fs): 
      result = yield from f # The 'yield from' may raise. 
      # Use result. 

    If a timeout is specified, the 'yield from' will raise 
    TimeoutError when the timeout occurs before all Futures are done. 

    Note: The futures 'f' are not necessarily members of fs. 
    """ 
    if futures.isfuture(fs) or coroutines.iscoroutine(fs): 
     raise TypeError("expect a list of futures, not %s" % type(fs).__name__) 
    loop = loop if loop is not None else events.get_event_loop() 
    todo = {ensure_future(f, loop=loop) for f in set(fs)} 
    from .queues import Queue # Import here to avoid circular import problem. 
    done = Queue(loop=loop) 
    timeout_handle = None 

    def _on_timeout(): 
     for f in todo: 
      f.remove_done_callback(_on_completion) 
      done.put_nowait(None) # Queue a dummy value for _wait_for_one(). 
     todo.clear() # Can't do todo.remove(f) in the loop. 

    def _on_completion(f): 
     if not todo: 
      return # _on_timeout() was here first. 
     todo.remove(f) 
     done.put_nowait(f) 
     if not todo and timeout_handle is not None: 
      timeout_handle.cancel() 

    @coroutine 
    def _wait_for_one(): 
     f = yield from done.get() 
     if f is None: 
      # Dummy value from _on_timeout(). 
      raise futures.TimeoutError 
     return f.result() # May raise f.exception(). 

    for f in todo: 
     f.add_done_callback(_on_completion) 
    if todo and timeout is not None: 
     timeout_handle = loop.call_later(timeout, _on_timeout) 
    for _ in range(len(todo)): 
     yield _wait_for_one() 

[哪里循环实际运行?]

对于semplicity起见,假设超时设置为无。

as_completed期望可迭代期货,而不是协同程序。所以这个期货已经被绑定到循环并且被安排执行。换句话说,那些期货是loop.create_task或asyncio.ensure_futures的输出(并且这是无处显示的)。 因此,循环已经“运行”了它们,当它们完成时,它们的未来.done()方法将返回True。

然后创建“完成”队列。请注意,“完成”队列是asyncio.queue的遗传,即使用循环«实现阻塞方法(.get,.put)»的队列。

通过“todo = {...”这一行,每个协程的未来(这是fs的一个元素)被绑定到另一个未来»绑定到循环«,并且这个最后的未来的done_callback被设置为调用_on_completion功能。

当循环将完成协程的执行时,将调用_on_completion函数,该协程的期限在设置为as_completed函数的“fs”中传递。

_on_completion函数从待办事项集中移除“我们的未来”,并将其结果(即未来在“fs”集中的协同程序)放入完成队列中。 换句话说,as_completed函数所做的全部工作就是将这些期货附加到done_callback中,以便将原始未来的结果移入已完成的队列。

然后,对于len(fs)== len(todo)次,as_completed函数会生成一个协程,用于阻止“从完成产出”。get()“,等待_on_completed(或_on_timeout)函数将结果放入完成队列中。

as_completed调用者执行的”yield from“s将等待显示结果在

完成的队列。[哪里是产量从何而来?]

它来自于一个事实,即待办事项是asyncio.queue,这样你就可以(asyncio-)块,直到值。把( )在队列中。