2014-09-21 24 views
1

我想写一个python脚本发送多个DNS请求,为每个请求使用不同的名称服务器。Python:实现并发DNS请求(流水线)

使用dnspython实现顺序解决方案很容易,但对我来说太慢了。 使用线程池添加并发是不可能的,因为在我的情况下,所有请求都使用相同的源端口(REUSE_ADDRESS在这里也不会有帮助)。

基于上述原因,我想使用下述溶液(开沟使用dnspython的解析器模块,但利用其信息建设的优势和解析模块):

  • 允许多达X请求是正在进行
  • 发送X同时请求(只是使用UDP与添加的延迟发送DNS请求报文。大概之间发送,以避免脉冲串)
  • 不同的线程等待响应
  • 当响应到达与它匹配请求(按地址),并允许新的请求运行
  • 如果对请求的响应不到位内TIMEOUT秒将其标记为已完成,并允许新的请求运行

我的主要问题,这里有:

  • 如何实现任务超时容易
  • 是否有可能实现它,而无需使用线程同步的(例如,使用事件循环?)
  • 是否有任何现有的库,可以帮助实现它(它真的感觉就像我试图在这里重新发明轮子,我看着asycnio模块,但无法找到一种方法来利用对我的问题)。请注意,我不想使用现有的DNS或网络库,因为我需要灵活地更改核心功能(例如使用原始套接字,更改DNS头字段等)。

回答

0

你试过aiodns包吗? https://pypi.python.org/pypi/aiodns/

超时asyncio有标准wait_for协同程序(https://docs.python.org/3/library/asyncio-task.html#asyncio.wait_for)。

+0

我需要为每个请求使用不同的名称服务器,它似乎像aiodns不是为此设计的。 – roee88 2014-09-28 13:55:41

+0

您可以为每个名称服务器创建一个'DNSResolver'实例,并通过'nameservers'参数指定所需的DNS IP。 'DNSResolver'的Instantiatig是相对轻量级的任务。 – 2014-09-29 07:59:09

0

使用简单的选择循环在这里工作得很好。这是一个完成代码片段:

def run(self, resolvers_iter): 
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) 
    sock.setblocking(False) 

    try: 
     pending = [] 

     # For rate limiting 
     limit = float(self.timeout)/self.max_pending # delay between sends 
     last_sent = clock() - limit 

     # Work as long as there are more resolvers to query 
     completed_sending = False 
     while not completed_sending or pending: 

      # Can I send more requests 
      want_to_write = False 
      if not completed_sending and len(pending) < self.max_pending: 
       want_to_write = True 

      # Calculate nearest timeout time to make sure select returns on time 
      timeout = None 
      if pending: 
       timeout = self.timeout - clock() + pending[0][0] + 0.001 
       timeout = max(timeout, 0) 

      # Rate limit 
      time_passed_since_send = clock() - last_sent 
      if want_to_write and time_passed_since_send + 0.001 < limit: 
       timeout = min(timeout, limit-time_passed_since_send) 
       timeout = max(timeout, 0) 
       want_to_write = False 

      # Poll socket - uses internally the select module 
      readable, writable = self._select(readable=True, writable=want_to_write, timeout=timeout) 

      # Can read 
      if readable: 
       # Read as many as possible 
       while True: 
        try: 
         # Get response 
         response, from_address = DnsFacilities.read_response(sock) 

         # Check if not duplicate or already timed out 
         sent_time = None 
         for i, (t, ip) in enumerate(pending): 
          if ip == from_address[0]: 
           sent_time = t 
           del pending[i] 
           break 

         if sent_time is not None: 
          self.response_received((response, from_address, clock()-sent_time)) 

        except socket.error, e: 
         if e[0] in (socket.errno.EWOULDBLOCK, socket.errno.EAGAIN): 
          break 
         elif e[0] in (socket.errno.WSAECONNRESET, socket.errno.WSAENETRESET): 
          pass 
         else: 
          raise 

      # Can write 
      if writable: 
       try: 
        last_sent = clock() 
        resolver_address = resolvers_iter.next() 
        DnsFacilities.send_query(resolver_address) 
        pending.append((clock(), resolver_address) 
       except StopIteration: 
        completed_sending = True 

      # Check for timed out tasks 
      now = clock() 
      while pending and now - pending[0][0] > self.timeout: 
       self.response_timeout(pending[0][1]) 
       del pending[0] 

    finally: 
     sock.close()