2014-09-29 17 views
11

我在这里搜索了如何在python中执行线程,但是到目前为止我还没有能够得到我需要的答案。 我对Queue和Threading python类不是很熟悉,因此一些在这里出现的anwsers对我来说毫无意义。如何从Python中的线程池中获取结果?

我想创建一个线程池,我可以给不同的任务,当他们都结束了获得结果值并处理它们。 到目前为止,我试图做到这一点,但我无法得到结果。我写的代码是:

from threading import Thread 
from Queue import Queue 

class Worker(Thread): 
    """Thread executing tasks from a given tasks queue""" 
    def __init__(self, tasks): 
     Thread.__init__(self) 
     self.tasks = tasks 
     self.daemon = True 
     self.result = None 
     self.start() 
    def run(self): 
     while True: 
      func, args, kargs = self.tasks.get() 
      try: 
       self.result = func(*args, **kargs) 
      except Exception, e: 
       print e 
      self.tasks.task_done() 
    def get_result(self): 
     return self.result 

class ThreadPool: 
    """Pool of threads consuming tasks from a queue""" 
    def __init__(self, num_threads): 
     self.tasks = Queue(num_threads) 
     self.results = [] 
     for _ in range(num_threads): 
      w = Worker(self.tasks) 
      self.results.append(w.get_result()) 
    def add_task(self, func, *args, **kargs): 
     """Add a task to the queue""" 
     self.tasks.put((func, args, kargs)) 
    def wait_completion(self): 
     """Wait for completion of all the tasks in the queue""" 
     self.tasks.join() 
    def get_results(self): 
     return self.results 

def foo(word, number): 
    print word*number 
    return number 

words = ['hello', 'world', 'test', 'word', 'another test'] 
numbers = [1,2,3,4,5] 
pool = ThreadPool(5) 
for i in range(0, len(words)): 
    pool.add_task(foo, words[i], numbers[i]) 

pool.wait_completion() 
results = pool.get_results() 
print results 

输出与打印字给出的次数给出的字符串,但是结果列表中存满了没有价值,所以在这里我应该把FUNC的返回值。

或者简单的方法是创建一个列表,其中我填充队列并添加一个字典或一些变量以将结果作为参数存储到我的函数中,并且在将任务添加到队列后,将此结果参数添加到结果列表:

def foo(word, number, r): 
    print word*number 
    r[(word,number)] = number 
    return number 

words = ['hello', 'world', 'test', 'word', 'another test'] 
numbers = [1,2,3,4,5] 
pool = ThreadPool(5) 
results = [] 
for i in range(0, len(words)): 
    r = {} 
    pool.add_task(foo, words[i], numbers[i], r) 
    results.append(r) 
print results 

我会非常感谢您的帮助。

回答

9

的Python实际上有一个内置的线程池就可以使用,its just not well documented

from multiprocessing.pool import ThreadPool 

def foo(word, number): 
    print (word * number) 
    r[(word,number)] = number 
    return number 

words = ['hello', 'world', 'test', 'word', 'another test'] 
numbers = [1,2,3,4,5] 
pool = ThreadPool(5) 
results = [] 
for i in range(0, len(words)): 
    results.append(pool.apply_async(foo, args=(words[i], numbers[i]))) 

pool.close() 
pool.join() 
results = [r.get() for r in results] 
print results 

或(使用map代替apply_async):

from multiprocessing.pool import ThreadPool 

def foo(word, number): 
    print word*number 
    return number 

def starfoo(args): 
    """ 

    We need this because map only supports calling functions with one arg. 
    We need to pass two args, so we use this little wrapper function to 
    expand a zipped list of all our arguments. 

    """  
    return foo(*args) 

words = ['hello', 'world', 'test', 'word', 'another test'] 
numbers = [1,2,3,4,5] 
pool = ThreadPool(5) 
# We need to zip together the two lists because map only supports calling functions 
# with one argument. In Python 3.3+, you can use starmap instead. 
results = pool.map(starfoo, zip(words, numbers)) 
print results 

pool.close() 
pool.join() 
+0

第二种情况是有用的,当数任务的大小与池的大小相同,不是吗? – 2014-09-29 16:39:57

+0

它可以很好地处理任何数量的任务,并且可以与任意数量的工作人员共享一个“Pool”。如果你想对一个迭代的所有项目运行一个函数,'map'是有用的,并且返回每个调用的结果。如果你有5名工作人员处理一个长度为100的迭代器,'Pool'将针对所有100个项目调用该函数,但是绝不会同时运行多于5个线程。输出将是一个长度为100的迭代器,其中包含所有函数调用的结果值。 – dano 2014-09-29 16:47:13

+1

@RafaelRios另一个需要注意的是,因为[GIL](https://wiki.python.org/moin/GlobalInterpreterLock),使用线程在Python中执行CPU绑定的工作没有任何性能优势。为了解决这个限制,你需要通过['multiprocessing'](https://docs.python.org/2.7/library/multiprocessing.html)模块来使用多个进程。对于上面的示例,可以使用'from multiprocessing import Pool'而不是'from multiprocessing.pool import ThreadPool'来创建开关。其他一切都保持不变。 – dano 2014-09-29 16:50:02

相关问题