2017-12-27 890 views
2

我与中信高科工作后,但我有点卡住了。我打电话给3个不同的API,每个都有自己的响应时间。打印响应部分完成Python的异步事件循环,同时还完成任务的响应

我想创建一个超时功能,它为每个任务返回一个可接受的时间。但是如果时间任务在可接受的时间内没有完成,我想返回部分数据,因为我不需要一个完整的数据集,速度更关注。

不过,我想保持未完成的任务工作,直到完成(即请求API数据插入到一个Postgres数据库。

我想知道,如果我们能做到这一点,而无需使用某种调度到保持背景中运行的任务。

+0

你熟悉的听众/观察者模式?它经常用于异步回调,可以在任何时候返回 –

+0

不,但是你有任何链接吗? –

+0

谷歌搜索'Python观察者模式'出现了这个:http://www.giantflyingsaucer.com/blog/?p=5117。否则,只需要寻找帮助你的教程 –

回答

1

但如果时间任务不是可接受的时间内完成,我想 返回部分数据,因为我并不需要一个完整的数据集和速度 更多的是关注的焦点。

但是,我想保持未完成的任务,直到完成

所以其他任务是独立于超时任务的状态,对吗?如果我正确地理解了你,你只想用他们自己的超时运行3 asyncio.Task,并在最后汇总他们的结果。

唯一可能的问题,我看到的是“想返回部分数据”,因为它很可能的事情如何组织有所不同,但我们可能只需要通过这个“部分数据”里面的任务上调超时被取消例外。

这里的小原型:

import asyncio 


class PartialData(Exception): 
    def __init__(self, data): 
     super().__init__() 
     self.data = data   


async def api_job(i): 
    data = 'job {i}:'.format(i=i) 
    try: 
     await asyncio.sleep(1) 
     data += ' step 1,' 
     await asyncio.sleep(2) 
     data += ' step 2,' 
     await asyncio.sleep(2) 
     data += ' step 3.' 
    except asyncio.CancelledError as exc: 
     raise PartialData(data) # Pass partial data to outer code with our exception. 
    else: 
     return data 


async def api_task(i, timeout): 
    """Wrapper for api_job to run it with timeout and retrieve it's partial data on timeout.""" 
    t = asyncio.ensure_future(api_job(i)) 
    try: 
     await asyncio.wait_for(t, timeout) 
    except asyncio.TimeoutError: 
     try: 
      await t 
     except PartialData as exc: 
      return exc.data # retrieve partial data on timeout and return it. 
    else: 
     return t.result() 


async def main(): 
    # Run 3 jobs with different timeouts: 
    results = await asyncio.gather(
     api_task(1, timeout=2), 
     api_task(2, timeout=4), 
     api_task(3, timeout=6), 
    ) 

    # Print results including "partial data": 
    for res in results: 
     print(res) 


if __name__ == '__main__': 
    loop = asyncio.get_event_loop() 
    try: 
     loop.run_until_complete(main()) 
    finally: 
     loop.run_until_complete(loop.shutdown_asyncgens()) 
     loop.close() 

输出:

job 1: step 1, 
job 2: step 1, step 2, 
job 3: step 1, step 2, step 3. 

(你可以看到前两个作业完成了超时和检索他们的DATAS中的一部分)

UPD:

复杂的例子包含可能的解决方案不同的事件:

import asyncio 
from contextlib import suppress 


async def stock1(_): 
    await asyncio.sleep(1) 
    return 'stock1 res' 

async def stock2(exception_in_2): 
    await asyncio.sleep(1) 
    if exception_in_2: 
     raise ValueError('Exception in stock2!') 
    await asyncio.sleep(1) 
    return 'stock2 res' 

async def stock3(_): 
    await asyncio.sleep(3) 
    return 'stock3 res' 


async def main(): 
    # Vary this values to see different situations: 
    timeout = 2.5 
    exception_in_2 = False 


    # To run all three stocks just create tasks for them: 
    tasks = [ 
     asyncio.ensure_future(s(exception_in_2)) 
     for s 
     in (stock1, stock2, stock3) 
    ] 


    # Now we just wait until one of this possible situations happened: 
    # 1) Everything done 
    # 2) Exception occured in one of tasks 
    # 3) Timeout occured and at least two tasks ready 
    # 4) Timeout occured and less than two tasks ready 
    # (https://docs.python.org/3/library/asyncio-task.html#asyncio.wait) 
    await asyncio.wait(
     tasks, 
     timeout=timeout, 
     return_when=asyncio.FIRST_EXCEPTION 
    ) 

    is_success = all(t.done() and not t.exception() for t in tasks) 
    is_exception = any(t.done() and t.exception() for t in tasks) 
    is_good_timeout = \ 
     not is_success and \ 
     not is_exception and \ 
     sum(t.done() for t in tasks) >= 2 
    is_bad_timeout = \ 
     not is_success and \ 
     not is_exception and \ 
     sum(t.done() for t in tasks) < 2 


    # If success, just print all results: 
    if is_success: 
     print('All done before timeout:') 
     for t in tasks: 
      print(t.result()) 
    # If timeout, but at least two done, 
    # print it leaving pending task to be executing. 
    # But note two important things: 
    # 1) You should guarantee pending task done before loop closed 
    # 2) What if pending task will finish with error, is it ok? 
    elif is_good_timeout: 
     print('Timeout, but enought tasks done:') 
     for t in tasks: 
      if t.done(): 
       print(t.result()) 
    # Timeout and not enought tasks done, 
    # let's just cancel all hanging:  
    elif is_bad_timeout: 
     await cancel_and_retrieve(tasks) 
     raise RuntimeError('Timeout and not enought tasks done') # You probably want indicate fail 
    # If any of tasks is finished with an exception, 
    # we should probably cancel unfinished tasks, 
    # await all tasks done and retrive all exceptions to prevent warnings 
    # (https://docs.python.org/3/library/asyncio-dev.html#detect-exceptions-never-consumed) 
    elif is_exception: 
     await cancel_and_retrieve(tasks) 
     raise RuntimeError('Exception in one of tasks') # You probably want indicate fail 


async def cancel_and_retrieve(tasks): 
    """ 
    Cancel all pending tasks, retrieve all exceptions 
    (https://docs.python.org/3/library/asyncio-dev.html#detect-exceptions-never-consumed) 
    It's cleanup function if we don't want task being continued. 
    """ 
    for t in tasks: 
     if not t.done(): 
      t.cancel() 
    await asyncio.wait(
     tasks, 
     return_when=asyncio.ALL_COMPLETED 
    ) 
    for t in tasks: 
     with suppress(Exception): 
      await t 


if __name__ == '__main__': 
    loop = asyncio.get_event_loop() 
    try: 
     loop.run_until_complete(main()) 
    finally: 

     # If some tasks still pending (is_good_timeout case), 
     # let's kill them: 
     loop.run_until_complete(
      cancel_and_retrieve(asyncio.Task.all_tasks()) 
     ) 

     loop.run_until_complete(loop.shutdown_asyncgens()) 
     loop.close() 
+0

让我们用这个逻辑一秒钟,让我们说你正在显示股票。有3只股票。你要求股票的价格,但股票2需要更长的时间。而不是等待,我们只显示股票价格1和3. –

+0

@StandardCitizen如果股票2在1或3之前到达,我们做什么? –

+0

感谢您的回复,让我们在允许的时间内说明2次到达并输入数据库。第三场比赛时间太长。因此,我们将返回请求,但第三个进程将保持打开状态并执行该任务直至完成。希望在刷新用户之后,它就会完成。可以说这些股票每天只开一次。 –