2012-09-07 54 views
15

如果我有两个threading.Event()对象,并希望直到它们中的任何一个都被设置好,是否有一种有效的方法可以在Python中执行此操作?很明显,我可以使用轮询/超时进行一些操作,但是我希望确保线程处于休眠状态,直到设置为止,类似于文件描述符如何使用selectPython线程:我可以同时睡在两个threading.Event()吗?

所以在下面的实现中,wait_for_either的高效非轮询实现是什么样的?

a = threading.Event() 
b = threading.Event() 

wait_for_either(a, b) 
+0

是否有充分的理由使用2个不同的事件而不使用同一个事件? –

+0

@Iulius你有一个单线程,你想成为事件驱动,但有2个队列...所以你需要醒来,当任何时候得到一个项目 – pyInTheSky

+0

我很惊讶Python没有这个内置的。 –

回答

18

这里是一个非轮询非过多的线程解决方案:修改现有Event s到火回调时,他们改变,并处理该回调设置一个新的事件:

import threading 

def or_set(self): 
    self._set() 
    self.changed() 

def or_clear(self): 
    self._clear() 
    self.changed() 

def orify(e, changed_callback): 
    e._set = e.set 
    e._clear = e.clear 
    e.changed = changed_callback 
    e.set = lambda: or_set(e) 
    e.clear = lambda: or_clear(e) 

def OrEvent(*events): 
    or_event = threading.Event() 
    def changed(): 
     bools = [e.is_set() for e in events] 
     if any(bools): 
      or_event.set() 
     else: 
      or_event.clear() 
    for e in events: 
     orify(e, changed) 
    changed() 
    return or_event 

用法示例:

其中
def wait_on(name, e): 
    print "Waiting on %s..." % (name,) 
    e.wait() 
    print "%s fired!" % (name,) 

def test(): 
    import time 

    e1 = threading.Event() 
    e2 = threading.Event() 

    or_e = OrEvent(e1, e2) 

    threading.Thread(target=wait_on, args=('e1', e1)).start() 
    time.sleep(0.05) 
    threading.Thread(target=wait_on, args=('e2', e2)).start() 
    time.sleep(0.05) 
    threading.Thread(target=wait_on, args=('or_e', or_e)).start() 
    time.sleep(0.05) 

    print "Firing e1 in 2 seconds..." 
    time.sleep(2) 
    e1.set() 
    time.sleep(0.05) 

    print "Firing e2 in 2 seconds..." 
    time.sleep(2) 
    e2.set() 
    time.sleep(0.05) 

其结果是:

Waiting on e1... 
Waiting on e2... 
Waiting on or_e... 
Firing e1 in 2 seconds... 
e1 fired!or_e fired! 

Firing e2 in 2 seconds... 
e2 fired! 

钍应该是线程安全的。欢迎任何评论。

编辑:哦,这里是你的wait_for_either函数,虽然我写代码的方式,最好是制作并传递一个or_event。请注意,不应手动设置或清除or_event

def wait_for_either(e1, e2): 
    OrEvent(e1, e2).wait() 
+2

这很好!然而,我看到一个问题:如果你“同化”两次相同的事件,每当你设置或清除它时,你都会遇到一个无限循环。 – Vincent

+0

这是一个好点!很快会修改 – Claudiu

+0

非常感谢!这正是我所期待的。你是否同意让这个答案中的代码在开源许可条款下使用? BSD或MIT将是理想的,因为它们与Numpy,Pandas,Scipy等兼容。 – naitsirhc

4

一个解决方案(与轮询)将做每个Event顺序等待一个循环

def wait_for_either(a, b): 
    while True: 
     if a.wait(tunable_timeout): 
      break 
     if b.wait(tunable_timeout): 
      break 

我认为,如果您调整超时不够好,效果也OK。


最好的非投票我能想到的是在不同的线程等待每一个,并设置一个共享Event的人,你会在主线程等待后。

def repeat_trigger(waiter, trigger): 
    waiter.wait() 
    trigger.set() 

def wait_for_either(a, b): 
    trigger = threading.Event() 
    ta = threading.Thread(target=repeat_trigger, args=(a, trigger)) 
    tb = threading.Thread(target=repeat_trigger, args=(b, trigger)) 
    ta.start() 
    tb.start() 
    # Now do the union waiting 
    trigger.wait() 

非常有趣,所以我写了以前的解决方案的OOP版本:

class EventUnion(object): 
    """Register Event objects and wait for release when any of them is set""" 
    def __init__(self, ev_list=None): 
     self._trigger = Event() 
     if ev_list: 
      # Make a list of threads, one for each Event 
      self._t_list = [ 
       Thread(target=self._triggerer, args=(ev,)) 
       for ev in ev_list 
      ] 
     else: 
      self._t_list = [] 

    def register(self, ev): 
     """Register a new Event""" 
     self._t_list.append(Thread(target=self._triggerer, args=(ev,))) 

    def wait(self, timeout=None): 
     """Start waiting until any one of the registred Event is set""" 
     # Start all the threads 
     map(lambda t: t.start(), self._t_list) 
     # Now do the union waiting 
     return self._trigger.wait(timeout) 

    def _triggerer(self, ev): 
     ev.wait() 
     self._trigger.set() 
+1

你可以使repeat_trigger也检查触发器(超时= 0为触发器和超时> 0为服务员),以便所有线程最终结束 –

+0

我在想同样的事情,但有一个比开始2线程更好的方法... – Claudiu

0

不漂亮,但你可以使用两个额外的线程复用活动...

def wait_for_either(a, b): 
    flag = False #some condition variable, event, or similar 

    class Event_Waiter(threading.Thread): 
    def __init__(self, event): 
     self.e = event 
    def run(self): 
     self.e.wait() 
     flag.set() 

    a_thread = Event_Waiter(a) 
    b_thread = Event_Waiter(b) 
    a.start() 
    b.start() 
    flag.wait() 

请注意,如果他们太快到达,您可能不得不担心无意中收到两个事件。辅助线程(a_thread和b_thread)应该在尝试设置标志时锁定同步,然后应该终止其他线程(如果线程已被使用,可能会重置该线程的事件)。

1

启动额外的线程似乎是一个明确的解决方案,虽然不是很有效。 函数wait_events将阻塞任何一个事件被设置。

def wait_events(*events): 
    event_share = Event() 

    def set_event_share(event): 
     event.wait() 
     event.clear() 
     event_share.set() 
    for event in events: 
     Thread(target=set_event_share(event)).start() 

    event_share.wait() 

wait_events(event1, event2, event3) 
+0

很高兴知道哪一个被触发 – Har

0
def wait_for_event_timeout(*events): 
    while not all([e.isSet() for e in events]): 
     #Check to see if the event is set. Timeout 1 sec. 
     ev_wait_bool=[e.wait(1) for e in events] 
     # Process if all events are set. Change all to any to process if any event set 
     if all(ev_wait_bool): 
      logging.debug('processing event') 
     else: 
      logging.debug('doing other work') 


e1 = threading.Event() 
e2 = threading.Event() 

t3 = threading.Thread(name='non-block-multi', 
         target=wait_for_event_timeout, 
         args=(e1,e2)) 
t3.start() 

logging.debug('Waiting before calling Event.set()') 
time.sleep(5) 
e1.set() 
time.sleep(10) 
e2.set() 
logging.debug('Event is set') 
1

扩展Claudiu's答案,你可以等待事件1或事件2或事件1,甚至2

from threading import Thread, Event, _Event 

class ConditionalEvent(_Event): 
    def __init__(self, events_list, condition): 
     _Event.__init__(self) 

     self.event_list = events_list 
     self.condition = condition 

     for e in events_list: 
      self._setup(e, self._state_changed) 

     self._state_changed() 

    def _state_changed(self): 
     bools = [e.is_set() for e in self.event_list] 
     if self.condition == 'or': 

      if any(bools): 
       self.set() 
      else: 
       self.clear() 

     elif self.condition == 'and': 

      if all(bools): 
       self.set() 
      else: 
       self.clear() 

    def _custom_set(self,e): 
     e._set() 
     e._state_changed() 

    def _custom_clear(self,e): 
     e._clear() 
     e._state_changed() 

    def _setup(self, e, changed_callback): 
     e._set = e.set 
     e._clear = e.clear 
     e._state_changed = changed_callback 
     e.set = lambda: self._custom_set(e) 
     e.clear = lambda: self._custom_clear(e) 

用法示例将一如既往非常相似

import time 

e1 = Event() 
e2 = Event() 

or_e = ConditionalEvent([e1, e2], 'or') 


Thread(target=wait_on, args=('e1', e1)).start() 
time.sleep(0.05) 
Thread(target=wait_on, args=('e2', e2)).start() 
time.sleep(0.05) 
Thread(target=wait_on, args=('or_e', or_e)).start() 
time.sleep(0.05) 

print "Firing e1 in 2 seconds..." 
time.sleep(2) 
e1.set() 
time.sleep(0.05) 

print "Firing e2 in 2 seconds..." 
time.sleep(2) 
e2.set() 
time.sleep(0.05) 
相关问题