2013-07-09 38 views
4

我正在做一个涉及数据收集和日志记录的项目。我有2个线程正在运行,一个集合线程和一个日志线程,都以main开头。我试图让程序在使用Ctrl-C时优雅地终止。如何让Python线程正常完成

我正在使用threading.Event来通知线程结束它们各自的循环。它可以正常工作来停止sim_collectData方法,但它似乎没有正确地停止logData线程。打印语句永远不会执行,程序就会停止。 (它并没有结束,只是坐在那里)。

logData中的第二个while循环是为了确保记录队列中的所有内容。目标是让Ctrl-C立即停止收集线程,然后允许日志线程完成清空队列,然后才能完全终止程序。 (现在,数据只是被打印出来 - 最终会被记录到数据库中)。

我不明白为什么第二个线程永远不会终止。我基于我在这个答案上做了什么:Stopping a thread after a certain amount of time。我错过了什么?

def sim_collectData(input_queue, stop_event): 
    ''' this provides some output simulating the serial 
    data from the data logging hardware. 
    ''' 
    n = 0 
    while not stop_event.is_set(): 
     input_queue.put("DATA: <here are some random data> " + str(n)) 
     stop_event.wait(random.randint(0,5)) 
     n += 1 
    print "Terminating data collection..." 
    return 

def logData(input_queue, stop_event): 
    n = 0 

    # we *don't* want to loop based on queue size because the queue could 
    # theoretically be empty while waiting on some data. 
    while not stop_event.is_set(): 
     d = input_queue.get() 
     if d.startswith("DATA:"): 
      print d 
     input_queue.task_done() 
     n += 1 

    # if the stop event is recieved and the previous loop terminates, 
    # finish logging the rest of the items in the queue. 
    print "Collection terminated. Logging remaining data to database..." 
    while not input_queue.empty(): 
     d = input_queue.get() 
     if d.startswith("DATA:"): 
      print d 
     input_queue.task_done() 
     n += 1 
    return 


def main(): 
    input_queue = Queue.Queue() 

    stop_event = threading.Event() # used to signal termination to the threads 

    print "Starting data collection thread...", 
    collection_thread = threading.Thread(target=sim_collectData, args=(input_queue,  stop_event)) 
    collection_thread.start() 
    print "Done." 

    print "Starting logging thread...", 
    logging_thread = threading.Thread(target=logData, args=(input_queue, stop_event)) 
    logging_thread.start() 
    print "Done." 

    try: 
     while True: 
     time.sleep(10) 
    except (KeyboardInterrupt, SystemExit): 
     # stop data collection. Let the logging thread finish logging everything in the queue 
     stop_event.set() 

main() 
+0

@tdelaney有最好的解决方案。所有的投票/超时回答都很差。 –

+0

你当然可以随时添加一个超时'input_queue.get()' – Jan

回答

7

问题是您的记录器正在等待d = input_queue.get()并且不会检查事件。一种解决方案是完全跳过事件并发明一条告诉记录器停止的独特消息。当您收到信号时,将该消息发送至队列。

import threading 
import Queue 
import random 
import time 

def sim_collectData(input_queue, stop_event): 
    ''' this provides some output simulating the serial 
    data from the data logging hardware. 
    ''' 
    n = 0 
    while not stop_event.is_set(): 
     input_queue.put("DATA: <here are some random data> " + str(n)) 
     stop_event.wait(random.randint(0,5)) 
     n += 1 
    print "Terminating data collection..." 
    input_queue.put(None) 
    return 

def logData(input_queue): 
    n = 0 

    # we *don't* want to loop based on queue size because the queue could 
    # theoretically be empty while waiting on some data. 
    while True: 
     d = input_queue.get() 
     if d is None: 
      input_queue.task_done() 
      return 
     if d.startswith("DATA:"): 
      print d 
     input_queue.task_done() 
     n += 1 

def main(): 
    input_queue = Queue.Queue() 

    stop_event = threading.Event() # used to signal termination to the threads 

    print "Starting data collection thread...", 
    collection_thread = threading.Thread(target=sim_collectData, args=(input_queue,  stop_event)) 
    collection_thread.start() 
    print "Done." 

    print "Starting logging thread...", 
    logging_thread = threading.Thread(target=logData, args=(input_queue,)) 
    logging_thread.start() 
    print "Done." 

    try: 
     while True: 
      time.sleep(10) 
    except (KeyboardInterrupt, SystemExit): 
     # stop data collection. Let the logging thread finish logging everything in the queue 
     stop_event.set() 

main() 
+0

请注意,在你到底有没有因为被阻塞的线程来把尽可能多的'None'到队列中。 – bodo

+0

@canaaerus - 这是一个很好的观点。在这种情况下,只有1个工作线程,但是调用N个工作线程需要N个终止消息是一个很好的补充。 – tdelaney

0

你打电话给你的input_queue没有超时阻塞。在logData的任一部分中,如果您致电input_queue.get()并且该队列为空,它将无限期地阻止,从而阻止logging_thread达到完成。

要解决问题,您需要拨打电话input_queue.get_nowait()或传递超时时间至input_queue.get()

这里是我的建议:

def logData(input_queue, stop_event): 
    n = 0 

    while not stop_event.is_set(): 
     try: 
      d = input_queue.get_nowait() 
      if d.startswith("DATA:"): 
       print "LOG: " + d 
       n += 1 
     except Queue.Empty: 
      time.sleep(1) 
    return 

您还信令线程终止,而不是等待他们这样做。考虑在main函数中执行此操作。

try: 
    while True: 
     time.sleep(10) 
except (KeyboardInterrupt, SystemExit): 
    stop_event.set() 
    collection_thread.join() 
    logging_thread.join() 
+0

谢谢!我应该更好地阅读文档... –

+0

Yuck - 这里不需要轮询/睡眠。 – tdelaney

+0

@tdelaney你可能是对的,并且在'get'上使用超时值可能是更好的方法。但是我就是这样把它扔在一起的,所以就是这样。 – rrhartjr

2

我不是线程方面的专家,但在你的logData功能第一d=input_queue.get()阻塞,即,如果队列为空,直到接收到队列消息它会永远坐在一个等待。这可能就是为什么logData线程永远不会终止的原因,它会一直等待队列消息。

请参阅[Python文档]将其更改为非阻塞队列读取:使用.get(False).get_nowait() - 但是当队列为空时,要么需要某种异常处理。

0

基于tdelaney的答案,我创建了一个基于迭代器的方法。迭代器在遇到终止消息时退出。我还添加了一个计数器,其中有多少个get-呼叫当前被阻止,stop-方法发送的终止消息数量也是如此。为了防止增加和读取计数器之间的竞争条件,我在那里设置一个停止位。而且我不使用None作为终止消息,因为它不一定能使用PriorityQueue相对于其他数据类型。

有两个限制,我没有必要消除。对于一个stop - 方法首先等待,直到队列为关闭线程之前空。第二个限制是,我没有任何代码在stop之后使队列可重用。后者可能很容易添加,而前者需要注意并发性和代码的使用环境。

你必须决定是否要stop也等待要消耗的所有终止消息。我选择在那里放置必要的join,但您可以将其删除。

所以这是代码:

import threading, queue 

from functools import total_ordering 
@total_ordering 
class Final: 
    def __repr__(self): 
     return "∞" 

    def __lt__(self, other): 
     return False 

    def __eq__(self, other): 
     return isinstance(other, Final) 

Infty = Final() 

class IterQueue(queue.Queue): 
    def __init__(self): 
     self.lock = threading.Lock() 
     self.stopped = False 
     self.getters = 0 
     super().__init__() 

    def __iter__(self): 
     return self 

    def get(self): 
     raise NotImplementedError("This queue may only be used as an iterator.") 

    def __next__(self): 
     with self.lock: 
      if self.stopped: 
       raise StopIteration 
      self.getters += 1 
     data = super().get() 
     if data == Infty: 
      self.task_done() 
      raise StopIteration 
     with self.lock: 
      self.getters -= 1 
     return data 

    def stop(self): 
     self.join() 
     self.stopped = True 
     with self.lock: 
      for i in range(self.getters): 
       self.put(Infty) 
     self.join() 

class IterPriorityQueue(IterQueue, queue.PriorityQueue): 
    pass 

哦,我在python 3.2写这个。因此向后移植后,

import threading, Queue 

from functools import total_ordering 
@total_ordering 
class Final: 
    def __repr__(self): 
     return "Infinity" 

    def __lt__(self, other): 
     return False 

    def __eq__(self, other): 
     return isinstance(other, Final) 

Infty = Final() 

class IterQueue(Queue.Queue, object): 
    def __init__(self): 
     self.lock = threading.Lock() 
     self.stopped = False 
     self.getters = 0 
     super(IterQueue, self).__init__() 

    def __iter__(self): 
     return self 

    def get(self): 
     raise NotImplementedError("This queue may only be used as an iterator.") 

    def next(self): 
     with self.lock: 
      if self.stopped: 
       raise StopIteration 
      self.getters += 1 
     data = super(IterQueue, self).get() 
     if data == Infty: 
      self.task_done() 
      raise StopIteration 
     with self.lock: 
      self.getters -= 1 
     return data 

    def stop(self): 
     self.join() 
     self.stopped = True 
     with self.lock: 
      for i in range(self.getters): 
       self.put(Infty) 
     self.join() 

class IterPriorityQueue(IterQueue, Queue.PriorityQueue): 
    pass 

你会使用它作为

import random 
import time 

def sim_collectData(input_queue, stop_event): 
    ''' this provides some output simulating the serial 
    data from the data logging hardware. 
    ''' 
    n = 0 
    while not stop_event.is_set(): 
     input_queue.put("DATA: <here are some random data> " + str(n)) 
     stop_event.wait(random.randint(0,5)) 
     n += 1 
    print "Terminating data collection..." 
    return 

def logData(input_queue): 
    n = 0 

    # we *don't* want to loop based on queue size because the queue could 
    # theoretically be empty while waiting on some data. 
    for d in input_queue: 
     if d.startswith("DATA:"): 
      print d 
     input_queue.task_done() 
     n += 1 

def main(): 
    input_queue = IterQueue() 

    stop_event = threading.Event() # used to signal termination to the threads 

    print "Starting data collection thread...", 
    collection_thread = threading.Thread(target=sim_collectData, args=(input_queue,  stop_event)) 
    collection_thread.start() 
    print "Done." 

    print "Starting logging thread...", 
    logging_thread = threading.Thread(target=logData, args=(input_queue,)) 
    logging_thread.start() 
    print "Done." 

    try: 
     while True: 
      time.sleep(10) 
    except (KeyboardInterrupt, SystemExit): 
     # stop data collection. Let the logging thread finish logging everything in the queue 
     stop_event.set() 
     input_queue.stop() 

main() 
相关问题