2014-09-05 159 views
11

这应该是非常简单的,我很惊讶,我一直无法找到已经在stackoverflow上回答的问题。多线程Python中的信号处理

我有一个程序需要响应SIGTERM和SIGINT信号的守护进程,以便与新贵们一起工作。我读过这样做的最好方法是在与主线程分离的线程中运行程序的主循环,并让主线程处理信号。然后当收到一个信号时,信号处理程序应该通过设置一个常规在主循环中检查的标记标志来告诉主循环退出。

我试过这样做,但它不按我期望的方式工作。请参见下面的代码:

from threading import Thread 
import signal 
import time 
import sys 

stop_requested = False  

def sig_handler(signum, frame): 
    sys.stdout.write("handling signal: %s\n" % signum) 
    sys.stdout.flush() 

    global stop_requested 
    stop_requested = True  

def run(): 
    sys.stdout.write("run started\n") 
    sys.stdout.flush() 
    while not stop_requested: 
     time.sleep(2) 

    sys.stdout.write("run exited\n") 
    sys.stdout.flush() 

signal.signal(signal.SIGTERM, sig_handler) 
signal.signal(signal.SIGINT, sig_handler) 

t = Thread(target=run) 
t.start() 
t.join() 
sys.stdout.write("join completed\n") 
sys.stdout.flush() 

我测试了在以下两个方面:

1)

$ python main.py > output.txt& 
[2] 3204 
$ kill -15 3204 

2)

$ python main.py 
ctrl+c 

在这两种情况下,我希望这写到输出:

run started 
handling signal: 15 
run exited 
join completed 

在第一种情况,程序退出,但我看到的是:

run started 

在当按下CTRL + C和程序不退出SIGTERM信号被忽略貌似第二壳体。

我在这里错过了什么?

+3

尝试用'while t.is_alive():t.join(1)'替换't.join()'。您的主线程会每秒醒来检查一次信号。 – roippi 2014-09-05 01:34:51

+2

更多阅读:http://snakesthatbite.blogspot.com/2010/09/cpython-threading-interrupting.html – roippi 2014-09-05 01:37:25

回答

18

的问题是,如在解释Execution of Python signal handlers

一个Python信号处理程序不获取低级(C)信号处理器中执行。取而代之的是,低电平信号的处理程序设置一个标志,它告诉虚拟机在稍后的点(在下一字节代码指令例如),以执行相应的Python信号处理程序

...

长期运行的计算无论接收到任何信号,纯粹以C语言实现(例如,在大量文本上进行正则表达式匹配)可以在任意时间内不间断地运行。计算结束时将调用Python信号处理程序。

您的主线程被阻止在threading.Thread.join上,这最终意味着它被阻止在C调用pthread_join。当然,这不是一个“长时间运行的计算”,它是一个系统调用块......但是,直到调用结束,你的信号处理程序不能运行。

而且,虽然在某些平台上pthread_join会因信号而失败,EINTR,但在其他平台上则不会。在linux上,我相信这取决于你是选择BSD风格还是默认的siginterrupt行为,但默认是否定的。


那么,你能做些什么呢?

嗯,我敢肯定changes to signal handling in Python 3.3实际上改变了Linux的默认行为,所以如果你升级你不需要做任何事情;只需运行在3.3+以下,并且您的代码将按照您的预期工作。至少它适用于OS X上的CPython 3.4和Linux上的3.3。 (如果我对此有错,我不确定它是否是CPython中的错误,因此您可能希望在python-list中提高它,而不是打开问题......)

另一方面, 3.3之前版本,signal模块肯定不会公开自己需要的工具来解决这个问题。所以,如果你不能升级到3.3,那么解决办法就是等待一些可中断的东西,比如ConditionEvent。子线程在退出之前通知事件,主线程在加入子线程之前等待事件。这绝对是不好的。我找不到任何能保证它有所作为的东西;它恰好适用于我在OS X上的各种版本的CPython 2.7和3.2以及Linux上的2.6和2.7 ...

+0

“这绝对是个黑客” - 我不会那么说。在一个更高的抽象层次上同步你的线程,而不是简单地使用'join'是明智的。如果你的目标是等待线程退出(就像这个特定的例子),那么'join'就是正确的工具。如果您想等待工作量完成,那么“条件”等就更有意义了。毕竟,工作负载可以在不会立即退出的池化线程中执行(例如)。 – 2018-02-04 08:13:33

8

abarnert的答案是现货。但我仍然在使用Python 2.7。为了解决这个问题,我写了一个InterruptableThread类。

现在它不允许将其他参数传递给线程目标。连接也不接受超时参数。这只是因为我不需要那样做。你可以添加它,如果你想。如果你自己使用它,你可能会想要移除输出语句。他们只是作为评论和测试的一种方式。

import threading 
import signal 
import sys 

class InvalidOperationException(Exception): 
    pass  

# noinspection PyClassHasNoInit 
class GlobalInterruptableThreadHandler: 
    threads = [] 
    initialized = False 

    @staticmethod 
    def initialize(): 
     signal.signal(signal.SIGTERM, GlobalInterruptableThreadHandler.sig_handler) 
     signal.signal(signal.SIGINT, GlobalInterruptableThreadHandler.sig_handler) 
     GlobalInterruptableThreadHandler.initialized = True 

    @staticmethod 
    def add_thread(thread): 
     if threading.current_thread().name != 'MainThread': 
      raise InvalidOperationException("InterruptableThread objects may only be started from the Main thread.") 

     if not GlobalInterruptableThreadHandler.initialized: 
      GlobalInterruptableThreadHandler.initialize() 

     GlobalInterruptableThreadHandler.threads.append(thread) 

    @staticmethod 
    def sig_handler(signum, frame): 
     sys.stdout.write("handling signal: %s\n" % signum) 
     sys.stdout.flush() 

     for thread in GlobalInterruptableThreadHandler.threads: 
      thread.stop() 

     GlobalInterruptableThreadHandler.threads = []  

class InterruptableThread: 
    def __init__(self, target=None): 
     self.stop_requested = threading.Event() 
     self.t = threading.Thread(target=target, args=[self]) if target else threading.Thread(target=self.run) 

    def run(self): 
     pass 

    def start(self): 
     GlobalInterruptableThreadHandler.add_thread(self) 
     self.t.start() 

    def stop(self): 
     self.stop_requested.set() 

    def is_stop_requested(self): 
     return self.stop_requested.is_set() 

    def join(self): 
     try: 
      while self.t.is_alive(): 
       self.t.join(timeout=1) 
     except (KeyboardInterrupt, SystemExit): 
      self.stop_requested.set() 
      self.t.join() 

     sys.stdout.write("join completed\n") 
     sys.stdout.flush() 

该类可以使用两种不同的方法。你可以分类InterruptableThread:

import time 
import sys 
from interruptable_thread import InterruptableThread 

class Foo(InterruptableThread): 
    def __init__(self): 
     InterruptableThread.__init__(self) 

    def run(self): 
     sys.stdout.write("run started\n") 
     sys.stdout.flush() 
     while not self.is_stop_requested(): 
      time.sleep(2) 

     sys.stdout.write("run exited\n") 
     sys.stdout.flush() 

sys.stdout.write("all exited\n") 
sys.stdout.flush() 

foo = Foo() 
foo2 = Foo() 
foo.start() 
foo2.start() 
foo.join() 
foo2.join() 

或者你可以使用它更像threading.thread工作方式。尽管运行方法必须将InterruptableThread对象作为参数。

import time 
import sys 
from interruptable_thread import InterruptableThread 

def run(t): 
    sys.stdout.write("run started\n") 
    sys.stdout.flush() 
    while not t.is_stop_requested(): 
     time.sleep(2) 

    sys.stdout.write("run exited\n") 
    sys.stdout.flush() 

t1 = InterruptableThread(run) 
t2 = InterruptableThread(run) 
t1.start() 
t2.start() 
t1.join() 
t2.join() 

sys.stdout.write("all exited\n") 
sys.stdout.flush() 

用它做什么你会。

1

我在这里遇到了同样的问题signal not handled when multiple threads join。在阅读abarnert的回答后,我改为使用Python 3并解决了这个问题。但我确实喜欢将我的所有程序都更改为python 3.所以,我通过在发送信号之前避免调用线程join()来解决我的程序。以下是我的代码。

这不是很好,但解决了我的程序在python 2.7。我的问题被标记为重复,所以我把我的解决方案放在这里。

import threading, signal, time, os 


RUNNING = True 
threads = [] 

def monitoring(tid, itemId=None, threshold=None): 
    global RUNNING 
    while(RUNNING): 
     print "PID=", os.getpid(), ";id=", tid 
     time.sleep(2) 
    print "Thread stopped:", tid 


def handler(signum, frame): 
    print "Signal is received:" + str(signum) 
    global RUNNING 
    RUNNING=False 
    #global threads 

if __name__ == '__main__': 
    signal.signal(signal.SIGUSR1, handler) 
    signal.signal(signal.SIGUSR2, handler) 
    signal.signal(signal.SIGALRM, handler) 
    signal.signal(signal.SIGINT, handler) 
    signal.signal(signal.SIGQUIT, handler) 

    print "Starting all threads..." 
    thread1 = threading.Thread(target=monitoring, args=(1,), kwargs={'itemId':'1', 'threshold':60}) 
    thread1.start() 
    threads.append(thread1) 
    thread2 = threading.Thread(target=monitoring, args=(2,), kwargs={'itemId':'2', 'threshold':60}) 
    thread2.start() 
    threads.append(thread2) 
    while(RUNNING): 
     print "Main program is sleeping." 
     time.sleep(30) 
    for thread in threads: 
     thread.join() 

    print "All threads stopped."