2015-10-14 36 views
-1

我正在使用python多进程库来处理一组进程中的信息。这些过程还包含进一步划分必须完成的工作量的过程。有一个Manager.Queue积累了所有消耗数据的进程的结果。Python进程在IO完成之前终止

在python脚本的主线程中。我试图使用连接来阻塞主线程,直到我们可以合理确定所有子进程是否完成,然后将输出写入单个文件。但是,在所有数据写入文件之前,系统会终止并关闭文件。

以下代码是上述解决方案实现的简化提取。 用于inQueues队列: queue.join()

for p in processes: 
    p.join() 
print "At the end output has: " + str(out_queue.qsize()) + " records" 

with open("results.csv", "w") as out_file: 
    out_file.write("Algorithm,result\n") 
    while not out_queue.empty(): 
     res = out_queue.get() 
     out_file.write(res['algorithm'] + ","+res['result']+"\n") 
     out_queue.task_done() 
     time.sleep(0.05) 
    out_queue.join() 
    out_file.close() 

的out_queue.qsize()将打印过量的500个记录可用,但是只有100将被打印到该文件。 同样在这一点上,如果500条记录是系统生成的总数,我不能100%确定,但只是此时报告的数字。

如何确保将所有结果写入results.csv文件?

+0

[QSIZE()](http://bugs.python.org/issue17985):“返回队列的近似大小由于。多线程/多处理语义, 是不可靠的。“ – kay

+0

我知道,由qsize方法指示的队列大小可能会发生变化,但代码段是整个程序中从队列中删除的唯一部分,因此预计打印的记录数不会小于队列的大小(这是当前发生的)。 – kyleED

回答

0

不要等到所有的进程完成你消耗的数据之前,但处理在同一时间的数据,并记住其进程仍在运行:

processes = [] 

"""start processes and append them to processes""" 

while True: 
    try: 
     # get an item 
     item = queue.get(True, 0.5) 
    except Queue.Empty: 
     # no item received in half a second 
     if not processes: 
      # there are no more processes and nothing left to process 
      break 
     else: 
      proc_num = 0 
      while proc_num < len(processes): 
       process = processes[proc_num] 
       exit_code = process.poll() 
       if exit_code is None: 
        # process is still running, proceed to next 
        proc_num += 1 
       elif exit_code == 0: 
        # process ended gracefully, remove it from list 
        processes.pop(proc_num) 
       else: 
        # process ended with an error, what now? 
        raise Exception('Her last words were: "%r"' % exit_code) 
    else: 
     # got an item 
     """process item""" 

,不测试,如果processes是空的外Queue.Empty或您将有races

,不过也许你会更开心一higher level function

pool = multiprocessing.Pool(8) 
items = pool.map_async(producer_function, producer_arguments) 
for item in items: 
    """process item""" 
相关问题