2016-07-28 33 views
1

我正在使用multiprocessing.queues.JoinableQueue,如下所示:multiprocessing.Queue似乎消失了?操作系统(管道销毁)与Python?

一个非常长时间运行的线程(多天)轮询XML的API。这样做的线程只是将XML解析为对象,并将它们推送到队列中。

对每个对象的处理比解析XML要花费更多的时间,并且绝不取决于从API读取线程。因此,这种多处理的实现很简单。

用于创建和清理过程的代码是在这里:

def queueAdd(self, item): 
     try: 
      self.queue.put(item) 
     except AssertionError: 
      #queue has been closed, remake it (let the other GC) 
      logger.warn('Queue closed early.') 
      self.queue = BufferQueue(ctx=multiprocessing.get_context()) 
      self.queue.put(item) 
     except BrokenPipeError: 
      #workaround for pipe issue 
      logger.warn('Broken pipe, Forcing creation of new queue.') 
      # all reading procesess should suicide and new ones spawned. 
      self.queue = BufferQueue(ctx=multiprocessing.get_context()) 
#    address = 'localhost' 
#    if address in multiprocessing.managers.BaseProxy._address_to_local: 
#     del BaseProxy._address_to_local[address][0].connection 
      self.queue.put(item) 
     except Exception as e: 
      #general thread exception. 
      logger.error('Buffer queue exception %s' % e) 
      #TODO: continue trying/trap exceptions? 
      raise 
     # check for finished consumers and clean them up before we check to see 
     # if we need to add additional consumers. 
     for csmr in self.running: 
      if not csmr.is_alive(): 
       debug('Child dead, releasing.') 
       self.running.remove(csmr) 

     #see if we should start a consumer... 
     # TODO: add min/max processes (default and override) 
     if not self.running: 
      debug('Spawning consumer.') 
      new_consumer = self.consumer(
        queue=self.queue, 
        results_queue=self.results_queue, 
        response_error=self.response_error) 
      new_consumer.start() 
      self.running.append(new_consumer) 

消费过程控制回路是相当简单的,以及:

def run(self): 
     '''Consumes the queue in the framework, passing off each item to the 
     ItemHandler method. 
     ''' 
     while True: 
      try: 
       item = self.queue.get(timeout=3) 
       #the base class just logs this stuff 
       rval = self.singleItemHandler(item) 
       self.queue.task_done() 
       if rval and self.results_queue: 
        self.results_queue.put(rval) 
      except queue.Empty: 
       logging.debug('Queue timed out after 3 seconds.') 
       break 
      except EOFError: 
       logging.info(
        '%s has finished consuming queue.' % (__class__.__name__)) 
       break 
      except Exception as e: 
       #general thread exception. 
       self.logger.error('Consumer exception %s' % e) 
       #TODO: continue trying/trap exceptions? 
       raise 

一段时间后(约成功处理一小时),我收到一条日志消息,指出消费者进程已由于超时DEBUG:root:Queue timed out after 3 seconds.而死亡,但队列仍处于打开状态,显然仍由原始线程写入。线程似乎并不认为消费者进程已经终止(请参阅queueAdd方法),并且不会尝试启动新进程。队列看起来不是空的,只是读取它似乎已超时。

我不知道为什么经理认为孩子还活着。

编辑


我已经修改了原来的问题由于代码更改为如何BrokenPipeError记录以及删除断开的连接清理。我认为这是一个单独的问题。

+0

一边仔细审查这个问题上,我意识到,我没有在这里列出的BrokenPipe异常记录输出:http://stackoverflow.com/questions/3649458/broken-pipe-when-using-python -multiprocessing-managers-basemanager-syncmanager – SkyLeach

+0

我已经添加了额外的日志记录,如上面注释中所述,但是在另一次测试没有得到关于BrokenPipeError的日志消息之后。 – SkyLeach

回答

0

这个问题是由多处理微妙的现实引起的。任何调用queue.put的进程都将运行一个写入命名管道的后台线程。

在我的特殊情况下,尽管没有大量数据发布到结果队列(由于某些原因无法处理的项目),但仍然足以“填满”管道并导致消费者即使没有运行也无法退出。这反过来导致写作队列慢慢填满。

解决方案是,我修改了我的非阻塞调用,以便下一次api调用迭代,以读取迄今为止可用的所有结果,除了最后一次(阻塞)调用之外,它确保获取所有结果。

def finish(self, block=True, **kwargs): 
    ''' 
    Notifies the buffer that we are done filling it. 
    This command binds to any processes still running and lets them 
    finish and then copies and flushes the managed results list. 
    ''' 
    # close the queue and wait until it is consumed 
    if block: 
     self.queue.close() 
     self.queue.join_thread() 
     # make sure the consumers are done consuming the queue 
     for csmr in self.running: 
      #get everything on the results queue right now. 
      try: 
       while csmr.is_alive(): 
        self.results_list.append(
         self.results_queue.get(timeout=0.5)) 
        self.results_queue.task_done() 
      except queue.Empty: 
       if csmr.is_alive(): 
        logger.warn('Result queue empty but consumer alive.') 
        logger.warn('joining %s.' % csmr.name) 
        csmr.join() 
     del self.running[:] 
     if self.callback: 
      return self.callback(self.results_list) 
    else: 
     #read results immediately available. 
     try: 
      while True: 
       self.results_list.append(self.results_queue.get_nowait()) 
       self.results_queue.task_done() 
     except queue.Empty: 
      #got everything on the queue so far 
      pass 
    return self.results_list