2013-07-10 289 views
12

要求是启动五个线程,并且只在最快的线程中等待。所有五个线程都去寻找相同的数据5个方向,一个足以继续控制流程。如何等待,直到在Python中完成第一个线程

实际上,我需要等待前两个线程返回来验证对方。但我猜如果我知道如何等待最快的。我可以想出如何等待第二快。

很多人都在谈论join(timeout),但是您事先不知道哪一个要等待(哪一个需要提前申请join)。

回答

1

或者只是跟踪列表中的所有完成的线程,并让第二个线程完成处理任何应该完成的任务,Python列表是线程安全的。

finished_threads = [] 
event = threading.Event() 

def func(): 
    do_important_stuff() 

    thisthread = threading.current_thread() 
    finished_threads.append(thisthread) 
    if len(finished_threads) > 1 and finished_threads[1] == thisthread: 
     #yay we are number two! 
     event.set() 

for i in range(5): 
    threading.Thread(target=func).start() 

event.wait() 
+0

这不回答关于主线程等待,直到两个线程位完成,然后继续:代替你传递的所有剩余活动到第二个线程来完成这可能不是什么都想。 – Duncan

+1

true; handle_two_threads_done()应该可以在事件上设置。编辑。 –

+0

嗯,Python列表是线程安全吗?真?我以为需要使用Queue()来实现线程一致性! –

3

如果你有某种在你的线程处理循环的,下面的代码将终止他们当一个终止使用threading.Event()

def my_thread(stop_event): 
    while not stop_event.is_set(): 
     # do stuff in a loop 

     # some check if stuff is complete 
     if stuff_complete: 
      stop_event.set() 
      break 

def run_threads(): 
    # create a thread event 
    a_stop_event = threading.Event() 

    # spawn the threads 
    for x in range(5): 
     t = threading.Thread(target=my_thread, args=[a_stop_event]) 
     t.start() 

    while not a_stop_event.is_set(): 
     # wait for an event 
     time.sleep(0.1) 

    print "At least one thread is done" 

如果你的过程是“便宜”或单请求 - 响应类型线程(即例如异步HTTP请求),那么Duncan's answer是一种好方法。

13

使用队列:建成后每个线程放在队列中的结果,然后你只需要读取结果的适当数量而忽略其余:

#!python3.3 
import queue # For Python 2.x use 'import Queue as queue' 
import threading, time, random 

def func(id, result_queue): 
    print("Thread", id) 
    time.sleep(random.random() * 5) 
    result_queue.put((id, 'done')) 

def main(): 
    q = queue.Queue() 
    threads = [ threading.Thread(target=func, args=(i, q)) for i in range(5) ] 
    for th in threads: 
     th.daemon = True 
     th.start() 

    result1 = q.get() 
    result2 = q.get() 

    print("Second result: {}".format(result2)) 

if __name__=='__main__': 
    main() 

的文档Queue.get()(不带参数它相当于Queue.get(True, None)

Queue.get([嵌段[,超时]])

卸下并从 返回一个项目队列。如果可选args块为true并且超时时间为无(默认为 ),则在必要时阻止,直到项目可用。如果超时时间为 为正数,则最多会阻止超时秒数,如果在该时间内没有可用项目,则会引发空例外情况。否则 (块为假),返回一个项目,如果一个是立即可用的,否则 引发空异常(在这种情况下超时被忽略)。

+1

如果在执行'q.get()'时Queue为空,这是否会引发'Empty'异常? – Michael

+2

@Michael,'q.get()'的默认值是做一个阻塞get,所以不会抛出异常,而是阻塞主线程,直到有可用结果。 – Duncan

1

邓肯的方法可能是最好的,我会推荐。不过,我之前一直缺乏“等待下一个完成的线程完成”的说法,我对此感到有些恼火,所以我只是写了这个文件来尝试一下。似乎工作。只需使用MWThread代替threading.thread,即可获得新的wait_for_thread功能。

全局变量有点klunky;另一种办法是让他们成为班级变量。但是,如果这是隐藏在一个模块(mwthread.py或其他),它应该没问题。

#! /usr/bin/env python 

# Example of how to "wait for"/join whichever threads is/are done, 
# in (more or less) the order they're done. 

import threading 
from collections import deque 

_monitored_threads = [] 
_exited_threads = deque() 
_lock = threading.Lock() 
_cond = threading.Condition(_lock) 

class MWThread(threading.Thread): 
    """ 
    multi-wait-able thread, or monitored-wait-able thread 
    """ 
    def run(self): 
     tid = threading.current_thread() 
     try: 
      with _lock: 
       _monitored_threads.append(tid) 
      super(MWThread, self).run() 
     finally: 
      with _lock: 
       _monitored_threads.remove(tid) 
       _exited_threads.append(tid) 
       _cond.notifyAll() 

def wait_for_thread(timeout=None): 
    """ 
    Wait for some thread(s) to have finished, with optional 
    timeout. Return the first finished thread instance (which 
    is removed from the finished-threads queue). 

    If there are no unfinished threads this returns None 
    without waiting. 
    """ 
    with _cond: 
     if not _exited_threads and _monitored_threads: 
      _cond.wait(timeout) 
     if _exited_threads: 
      result = _exited_threads.popleft() 
     else: 
      result = None 
    return result 

def main(): 
    print 'testing this stuff' 
    def func(i): 
     import time, random 
     sleeptime = (random.random() * 2) + 1 
     print 'thread', i, 'starting - sleep for', sleeptime 
     time.sleep(sleeptime) 
     print 'thread', i, 'finished' 

    threads = [MWThread(target=func, args=(i,)) for i in range(3)] 
    for th in threads: 
     th.start() 
    i = 0 
    while i < 3: 
     print 'main: wait up to .5 sec' 
     th = wait_for_thread(.5) 
     if th: 
      print 'main: got', th 
      th.join() 
      i += 1 
     else: 
      print 'main: timeout' 
    print 'I think I collected them all' 
    print 'result of wait_for_thread():' 
    print wait_for_thread() 

if __name__ == '__main__': 
    main() 
相关问题