2017-08-30 35 views
11

在使用aiohttp将某些同步代码移动到asyncio的过程中。同步代码需要15分钟才能运行,所以我希望能够改善这一点。Python aiohttp/asyncio - 如何处理返回的数据

我有一些工作代码,它从一些网址获取数据并返回每个网址的正文。但这只是针对1个实验室网站,我有70多个实际网站。

因此,如果我有一个循环来创建所有网站的列表,所有的网站将使700个网址在列表中进行处理。现在处理它们我不认为是一个问题?

但是做结果的'东西',我不知道如何编程?我的代码已经可以对每个返回的结果执行“填充”,但我不确定如何针对正确的结果类型进行编程。

当代码运行时,它会处理所有的url,并根据运行时间,返回一个未知的命令?

我需要一个函数来处理任何类型的结果吗?

import asyncio, aiohttp, ssl 
from bs4 import BeautifulSoup 

def page_content(page): 
    return BeautifulSoup(page, 'html.parser') 


async def fetch(session, url): 
    with aiohttp.Timeout(15, loop=session.loop): 
     async with session.get(url) as response: 
      return page_content(await response.text()) 

async def get_url_data(urls, username, password): 
    tasks = [] 
    # Fetch all responses within one Client session, 
    # keep connection alive for all requests. 
    async with aiohttp.ClientSession(auth=aiohttp.BasicAuth(username, password)) as session: 
     for i in urls: 
      task = asyncio.ensure_future(fetch(session, i)) 
      tasks.append(task) 

     responses = await asyncio.gather(*tasks) 
     # you now have all response bodies in this variable 
     for i in responses: 
      print(i.title.text) 
     return responses 


def main(): 
    username = 'monitoring' 
    password = '*********' 
    ip = '10.10.10.2' 
    urls = [ 
     'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'10.10.0.1'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.10.0.1'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'frontend.domain.com'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'planner.domain.com'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.10.10.1'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.11.11.1'), 
     'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'10.12.12.60'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'10.12.12.60'), 
     'http://{0}:8444/level/15/exec/-/ping/{1}/timeout/1/source/vlan/5/CR'.format(ip,'lon-dc-01.domain.com'), 
     'http://{0}:8444/level/15/exec/-/traceroute/{1}/source/vlan/5/probe/2/timeout/1/ttl/0/10/CR'.format(ip,'lon-dc-01.domain.com'), 
     ] 
    loop = asyncio.get_event_loop() 
    future = asyncio.ensure_future(get_url_data(urls,username,password)) 
    data = loop.run_until_complete(future) 
    print(data) 

if __name__ == "__main__": 
    main() 

回答

2

你的代码离商标不远。 asyncio.gather以参数的顺序返回结果,因此顺序在此保留,但不会按顺序调用page_content

了一些调整:

首先,你不需要ensure_future这里。创建一个任务只有在你想让一个协程远远超过它的父项时才需要,即,如果任务必须继续运行,即使创建它的函数已经完成。在这里,你需要与你的协同程序被调用,而不是直接asyncio.gather什么:

async def get_url_data(urls, username, password): 
    async with aiohttp.ClientSession(...) as session: 
     responses = await asyncio.gather(*(fetch(session, i) for i in urls)) 
    for i in responses: 
     print(i.title.text) 
    return responses 

调用,这将安排在同一时间全部取出,并用大量的URL,这是远远没有达到最佳。相反,您应该选择最大并发性,并确保至多X次提取正在随时运行。要实现这一点,你可以使用一个asyncio.Semaphore(20),这个信号量最多只能获得20个协程,所以其他人将等待获取,直到有一个点。

CONCURRENCY = 20 
TIMEOUT = 15 

async def fetch(session, sem, url): 
    async with sem: 
     async with session.get(url) as response: 
      return page_content(await response.text()) 

async def get_url_data(urls, username, password): 
    sem = asyncio.Semaphore(CONCURRENCY) 
    async with aiohttp.ClientSession(...) as session: 
     responses = await asyncio.gather(*(
      asyncio.wait_for(fetch(session, sem, i), TIMEOUT) 
      for i in urls 
     )) 
    for i in responses: 
     print(i.title.text) 
    return responses 

这样,所有提取立即开始,但只有20个将能够获得信号量。其他的将在第一个async with指令处阻塞,并等待另一个获取完成。

我也用这里的官方asyncio替代了aiohttp.Timeout。

最后,对于数据的实际处理,如果受到CPU时间的限制,asyncio可能对您没有多大帮助。这里需要使用ProcessPoolExecutor将实际工作并行到另一个CPU。 run_in_executor可能会用于。

+0

谢谢,我明白你所说的一切,但你在ProcessPoolExecutor部分已经失去了我。我需要一个独立的CPU进程的结果?我该怎么做呢?以及如何按顺序处理它们,还是需要一个处理所有结果的函数? – AlexW

2

下面是concurrent.futures.ProcessPoolExecutor的示例。如果创建时未指定max_workers,则实现将使用os.cpu_count来代替。另请注意,asyncio.wrap_future是公开的,但没有记录。或者,有AbstractEventLoop.run_in_executor

import asyncio 
from concurrent.futures import ProcessPoolExecutor 

import aiohttp 
import lxml.html 


def process_page(html): 
    '''Meant for CPU-bound workload''' 
    tree = lxml.html.fromstring(html) 
    return tree.find('.//title').text 


async def fetch_page(url, session): 
    '''Meant for IO-bound workload''' 
    async with session.get(url, timeout = 15) as res: 
     return await res.text() 


async def process(url, session, pool): 
    html = await fetch_page(url, session) 
    return await asyncio.wrap_future(pool.submit(process_page, html)) 


async def dispatch(urls): 
    pool = ProcessPoolExecutor() 
    async with aiohttp.ClientSession() as session: 
     coros = (process(url, session, pool) for url in urls) 
     return await asyncio.gather(*coros) 


def main(): 
    urls = [ 
     'https://stackoverflow.com/', 
     'https://serverfault.com/', 
     'https://askubuntu.com/', 
     'https://unix.stackexchange.com/' 
    ] 
    result = asyncio.get_event_loop().run_until_complete(dispatch(urls)) 
    print(result) 

if __name__ == '__main__': 
    main() 
相关问题