2013-02-05 36 views
3

Python中是否有一个简单的ThreadPool库,例如,pool.execute(function,args)方法,当池已满时应该被阻塞(直到其中一个线程变为空闲状态)?是否存在一个与Java的FixedThreadPool相当的Python库?

我已经尝试了使用多处理程序包中的ThreadPool,但是它的pool.apply_async()函数在池已满时不会阻塞。其实,我根本没有得到它的行为。

回答

2

ActiveState Code Recipes page具有基于Python queue做阻塞的实现。您可以使用add_taskexecute

## {{{ http://code.activestate.com/recipes/577187/ (r9) 
from Queue import Queue 
from threading import Thread 

class Worker(Thread): 
    """Thread executing tasks from a given tasks queue""" 
    def __init__(self, tasks): 
     Thread.__init__(self) 
     self.tasks = tasks 
     self.daemon = True 
     self.start() 

    def run(self): 
     while True: 
      func, args, kargs = self.tasks.get() 
      try: func(*args, **kargs) 
      except Exception, e: print e 
      self.tasks.task_done() 

class ThreadPool: 
    """Pool of threads consuming tasks from a queue""" 
    def __init__(self, num_threads): 
     self.tasks = Queue(num_threads) 
     for _ in range(num_threads): Worker(self.tasks) 

    def add_task(self, func, *args, **kargs): 
     """Add a task to the queue""" 
     self.tasks.put((func, args, kargs)) 

    def wait_completion(self): 
     """Wait for completion of all the tasks in the queue""" 
     self.tasks.join() 

if __name__ == '__main__': 
    from random import randrange 
    delays = [randrange(1, 10) for i in range(100)] 

    from time import sleep 
    def wait_delay(d): 
     print 'sleeping for (%d)sec' % d 
     sleep(d) 

    # 1) Init a Thread pool with the desired number of threads 
    pool = ThreadPool(20) 

    for i, d in enumerate(delays): 
     # print the percentage of tasks placed in the queue 
     print '%.2f%c' % ((float(i)/float(len(delays)))*100.0,'%') 

     # 2) Add the task to the queue 
     pool.add_task(wait_delay, d) 

    # 3) Wait for completion 
    pool.wait_completion() 
## end of http://code.activestate.com/recipes/577187/ }}} 
+0

这是完美的,谢谢。 – Ognjen

-1
from multiprocessing.pool import ThreadPool 
+0

线程池不支持封锁的行为,当池已满......作为Ognhej在他的问题也提到了...... –

0

我修改@ckhan答案并添加了回调功能。

#Custom ThreadPool implementation from http://code.activestate.com/recipes/577187/ 
# Usage: 
#  def worker_method(data): 
#   ''' do processing ''' 
#   return data 
#   
#  def on_complete_callback(data, isfailed): 
#   print ('on complete %s' % data) 
#   
#  pool = ThreadPool(5) 
#  for data in range(1,10) 
#   pool.add_task(worker_method, on_complete_callback, data)  
#  pool.wait_completion() 

from Queue import Queue 
from threading import Thread 
import thread 

class Worker(Thread): 
    """Thread executing tasks from a given tasks queue""" 
    def __init__(self, tasks, thread_id): 
     Thread.__init__(self) 
     self.tasks = tasks 
     self.daemon = True 
     self.start() 
     self.thread_id = thread_id 

    def run(self): 
     while True: 
      func, callback, args, kargs = self.tasks.get() 
      try: 
       data = func(*args, **kargs) 
       callback(data, False) 
      except Exception, e:     
       callback(e, True) 
      self.tasks.task_done() 

class ThreadPool: 
    """Pool of threads consuming tasks from a queue""" 
    def __init__(self, num_threads): 
     self.tasks = Queue(num_threads) 
     for i in range(num_threads): Worker(self.tasks, i) 

    def add_task(self, func, callback, *args, **kargs): 
     """Add a task to the queue""" 
     self.tasks.put((func, callback, args, kargs)) 

    def wait_completion(self): 
     """Wait for completion of all the tasks in the queue""" 
     self.tasks.join() 
相关问题