2014-02-10 108 views
1

使用以下代码来多线程urlib2。然而,限制它消耗的线程数量的最好方法是什么?限制Python线程队列中的线程,队列

class ApiMultiThreadHelper: 

    def __init__(self,api_calls): 
     self.q = Queue.Queue() 
     self.api_datastore = {} 
     self.api_calls = api_calls 
     self.userpass = '#####' 

    def query_api(self,q,api_query): 
     self.q.put(self.issue_request(api_query)) 

    def issue_request(self,api_query): 

     self.api_datastore.update({api_query:{}}) 

     for lookup in ["call1","call2"]: 
      query = api_query+lookup 

      request = urllib2.Request(query) 
      request.add_header("Authorization", "Basic %s" % self.userpass) 
      f = urllib2.urlopen(request) 
      response = f.read() 
      f.close() 

      self.api_datastore[api_query].update({lookup:response}) 

     return True 

    def go(self): 
     threads = [] 
     for i in self.api_calls: 
      t = threading.Thread(target=self.query_api, args = (self.q,i)) 
      t.start() 
      threads.append(t) 

     for t in threads: 
      t.join() 

回答

1

您应该使用线程池。下面是我实现我年前(Python的3.x的友好):

import traceback 
from threading import Thread 
try: 
    import queue as Queue # Python3.x 
except ImportError: 
    import Queue 

class ThreadPool(object): 
    def __init__(self, no=10): 
     self.alive = True 
     self.tasks = Queue.Queue() 
     self.threads = [] 
     for _ in range(no): 
      t = Thread(target=self.worker) 
      t.start() 
      self.threads.append(t) 

    def worker(self): 
     while self.alive: 
      try: 
       fn, args, kwargs = self.tasks.get(timeout=0.5) 
      except Queue.Empty: 
       continue 
      except ValueError: 
       self.tasks.task_done() 
       continue 

      try: 
       fn(*args, **kwargs) 
      except Exception: 
       # might wanna add some better error handling 
       traceback.print_exc() 

      self.tasks.task_done() 

    def add_job(self, fn, args=[], kwargs={}): 
     self.tasks.put((fn, args, kwargs)) 

    def join(self): 
     self.tasks.join() 

    def deactivate(self): 
     self.alive = False 
     for t in self.threads: 
      t.join() 

您也可以找到一个类似的类multiprocessing.pool模块(不要问我为什么它的存在)。然后,您可以像这样重构代码:

def go(self): 
    tp = ThreadPool(20) # <-- 20 thread workers 
    for i in self.api_calls: 
     tp.add_job(self.query_api, args=(self.q, i)) 
    tp.join() 
    tp.deactivate() 

线程数目现在定义为先验值。