2013-08-21 88 views
0

我有一个守护程序,它运行一些子进程,旨在维护一个telnet连接以从一堆气象站收集数据。我已经设置好了,以便这些子进程永远读取该telnet连接,并通过multiprocessing.Queue将天气读数传回父进程。当我停止守护进程./test.py stop时,我似乎无法让这些子进程完全退出。有没有简单的方法来关闭退出时的子进程?快速谷歌提到使用multiprocessing.Event的人,退出时设置此事件以确保进程退出的最佳方式是什么?这是我们当前的代码:Python多重处理干净地退出

from daemon import runner 
from multiprocessing import Process, Queue 
import telnetlib 

from django.utils.encoding import force_text 

from observations.weather.models import WeatherStation 
import os 

os.environ['DJANGO_SETTINGS_MODULE'] = 'settings' 


def read_weather_data(name, ip_address, port, queue): 
    print "Started process to get data for", name 
    client = telnetlib.Telnet(ip_address, port) 

    while True: 
     response = client.read_until('\r\n'.encode('utf8')) 
     queue.put((name, force_text(response))) 

    client.close() 


class App(object): 
    def __init__(self): 
     self.stdin_path = '/dev/null' 
     self.stdout_path = '/dev/tty' 
     self.stderr_path = '/dev/tty' 
     self.pidfile_path = '/tmp/process_weather.pid' 
     self.pidfile_timeout = 5 

    def run(self): 
     queue = Queue() 

     for station in WeatherStation.objects.filter(active=True): 
      p = Process(target=read_weather_data, 
         args=(station.name, station.ip_address, station.port, 
           queue,)) 
      p.start() 

     while True: 
      name, data = queue.get() 
      print "Received data from ", name 
      print data 

app = App() 
daemon_runner = runner.DaemonRunner(app) 
daemon_runner.do_action() 

回答

2

似乎已经找到一种方法来做到这一点,但我不确定这是否是采取最好的办法。

from daemon import runner 
from multiprocessing import Process, Queue, Event 
import telnetlib 

from django.utils.encoding import force_text 

from observations.weather.models import WeatherStation 
import os 
import signal 
import errno 

os.environ['DJANGO_SETTINGS_MODULE'] = 'settings' 


def read_weather_data(name, ip_address, port, queue, exit): 
    print "Started process to get data for", name 
    client = telnetlib.Telnet(ip_address, port) 

    while not exit.is_set(): 
     response = client.read_until('\r\n'.encode('utf8')) 
     queue.put((name, force_text(response))) 

    print "exit called for", name 
    client.close() 


def exit_handler(signum, frame): 
    print "exiting..." 


class App(object): 
    def __init__(self): 
     self.stdin_path = '/dev/null' 
     self.stdout_path = '/dev/tty' 
     self.stderr_path = '/dev/tty' 
     self.pidfile_path = '/tmp/process_weather.pid' 
     self.pidfile_timeout = 5 

    def run(self): 
     exit = Event() 

     def exit_handler(signum, frame): 
      print "exiting..." 
      exit.set() 

     signal.signal(signal.SIGTERM, exit_handler) 
     queue = Queue() 

     workers = [] 
     for station in WeatherStation.objects.filter(active=True): 
      p = Process(target=read_weather_data, 
         args=(station.name, station.ip_address, station.port, 
           queue, exit)) 
      workers.append(p) 

     for worker in workers: 
      worker.start() 

     while True: 
      try: 
       name, data = queue.get() 
      except IOError as e: 
       # we received a signal whilst waiting for I/O 
       if e.errno != errno.EINTR: 
        raise 
       else: 
        break 
      print "Received data from ", name 
      print data 

     for worker in workers: 
      worker.join() 


app = App() 
daemon_runner = runner.DaemonRunner(app) 
daemon_runner.do_action()