我正在使用multiprocessing.queues.JoinableQueue,如下所示:multiprocessing.Queue似乎消失了?操作系统(管道销毁)与Python?
一个非常长时间运行的线程(多天)轮询XML的API。这样做的线程只是将XML解析为对象,并将它们推送到队列中。
对每个对象的处理比解析XML要花费更多的时间,并且绝不取决于从API读取线程。因此,这种多处理的实现很简单。
用于创建和清理过程的代码是在这里:
def queueAdd(self, item):
try:
self.queue.put(item)
except AssertionError:
#queue has been closed, remake it (let the other GC)
logger.warn('Queue closed early.')
self.queue = BufferQueue(ctx=multiprocessing.get_context())
self.queue.put(item)
except BrokenPipeError:
#workaround for pipe issue
logger.warn('Broken pipe, Forcing creation of new queue.')
# all reading procesess should suicide and new ones spawned.
self.queue = BufferQueue(ctx=multiprocessing.get_context())
# address = 'localhost'
# if address in multiprocessing.managers.BaseProxy._address_to_local:
# del BaseProxy._address_to_local[address][0].connection
self.queue.put(item)
except Exception as e:
#general thread exception.
logger.error('Buffer queue exception %s' % e)
#TODO: continue trying/trap exceptions?
raise
# check for finished consumers and clean them up before we check to see
# if we need to add additional consumers.
for csmr in self.running:
if not csmr.is_alive():
debug('Child dead, releasing.')
self.running.remove(csmr)
#see if we should start a consumer...
# TODO: add min/max processes (default and override)
if not self.running:
debug('Spawning consumer.')
new_consumer = self.consumer(
queue=self.queue,
results_queue=self.results_queue,
response_error=self.response_error)
new_consumer.start()
self.running.append(new_consumer)
消费过程控制回路是相当简单的,以及:
def run(self):
'''Consumes the queue in the framework, passing off each item to the
ItemHandler method.
'''
while True:
try:
item = self.queue.get(timeout=3)
#the base class just logs this stuff
rval = self.singleItemHandler(item)
self.queue.task_done()
if rval and self.results_queue:
self.results_queue.put(rval)
except queue.Empty:
logging.debug('Queue timed out after 3 seconds.')
break
except EOFError:
logging.info(
'%s has finished consuming queue.' % (__class__.__name__))
break
except Exception as e:
#general thread exception.
self.logger.error('Consumer exception %s' % e)
#TODO: continue trying/trap exceptions?
raise
一段时间后(约成功处理一小时),我收到一条日志消息,指出消费者进程已由于超时DEBUG:root:Queue timed out after 3 seconds.
而死亡,但队列仍处于打开状态,显然仍由原始线程写入。线程似乎并不认为消费者进程已经终止(请参阅queueAdd方法),并且不会尝试启动新进程。队列看起来不是空的,只是读取它似乎已超时。
我不知道为什么经理认为孩子还活着。
编辑
我已经修改了原来的问题由于代码更改为如何BrokenPipeError记录以及删除断开的连接清理。我认为这是一个单独的问题。
一边仔细审查这个问题上,我意识到,我没有在这里列出的BrokenPipe异常记录输出:http://stackoverflow.com/questions/3649458/broken-pipe-when-using-python -multiprocessing-managers-basemanager-syncmanager – SkyLeach
我已经添加了额外的日志记录,如上面注释中所述,但是在另一次测试没有得到关于BrokenPipeError的日志消息之后。 – SkyLeach