2015-02-09 42 views
3

我有5,00,000个网址。并希望得到每个异步的响应。1个代理的asyncio信号量/多个池锁 - aiohttp

import aiohttp 
import asyncio  

@asyncio.coroutine 
def worker(url): 
    response = yield from aiohttp.request('GET', url, connector=aiohttp.TCPConnector(share_cookies=True, verify_ssl=False)) 
    body = yield from response.read_and_close() 

    print(url) 

def main(): 
    url_list = [] # lacs of urls, extracting from a file 

    loop = asyncio.get_event_loop() 
    loop.run_until_complete(asyncio.wait([worker(u) for u in url_list])) 

main() 

我希望每次200个连接(并发200),没有比这更多的是因为

当我运行此程序的50个URL,它工作正常,即url_list[:50] 但如果我通过整个列表,我得到这个错误

aiohttp.errors.ClientOSError: Cannot connect to host www.example.com:443 ssl:True Future/Task exception was never retrieved future: Task() 

可能是频率太高,服务器拒绝响应后限制?

回答

6

是的,人们可以期望一个服务器在导致过多的流量(不管“流量过多”的定义)后停止响应。

在这种情况下,限制并发请求数(限制它们)的一种方法是使用asyncio.Semaphore,与多线程中使用的类似:您可以创建一个信号量并确保您要调节的操作在做实际工作之前获得信号量并在之后释放。

为了您的方便,asyncio.Semaphore实现了上下文管理器,使其更容易。

大部分基本方法:

CONCURRENT_REQUESTS = 200 


@asyncio.coroutine 
def worker(url, semaphore): 
    # Aquiring/releasing semaphore using context manager. 
    with (yield from semaphore): 
     response = yield from aiohttp.request(
      'GET', 
      url, 
      connector=aiohttp.TCPConnector(share_cookies=True, 
              verify_ssl=False)) 
     body = yield from response.read_and_close() 

     print(url) 


def main(): 
    url_list = [] # lacs of urls, extracting from a file 

    semaphore = asyncio.Semaphore(CONCURRENT_REQUESTS) 
    loop = asyncio.get_event_loop() 
    loop.run_until_complete(asyncio.wait([worker(u, semaphore) for u in url_list]))