2016-01-13 43 views
2

我正在做一些文件解析,这是一个CPU绑定任务。无论我在这个过程中抛出多少文件,它使用的RAM不超过50MB。 任务是parrallelisable,我已经将它设置为使用下面并发期货对每个文件解析为一个单独的进程:使用并发期货而不耗尽内存

from concurrent import futures 
    with futures.ProcessPoolExecutor(max_workers=6) as executor: 
     # A dictionary which will contain a list the future info in the key, and the filename in the value 
     jobs = {} 

     # Loop through the files, and run the parse function for each file, sending the file-name to it. 
     # The results of can come back in any order. 
     for this_file in files_list: 
      job = executor.submit(parse_function, this_file, **parser_variables) 
      jobs[job] = this_file 

     # Get the completed jobs whenever they are done 
     for job in futures.as_completed(jobs): 

      # Send the result of the file the job is based on (jobs[job]) and the job (job.result) 
      results_list = job.result() 
      this_file = jobs[job] 

      # delete the result from the dict as we don't need to store it. 
      del jobs[job] 

      # post-processing (putting the results into a database) 
      post_process(this_file, results_list) 

的问题是,当我运行这个使用期货,RAM使用火箭和前我已经用完了,Python已经崩溃了。这可能很大程度上是因为parse_function的结果大小为几MB。一旦结果通过post_processing,应用程序就不再需要它们了。正如你所看到的,我试图用del jobs[job]来清除jobs中的项目,但这没有什么区别,内存使用率保持不变,并且似乎以相同的速率增加。

我也确认这不是因为它只是使用一个进程等待post_process函数,而且还会抛出time.sleep(1)

关于内存管理的期货文档中没有任何内容,虽然简要的搜索表明它已经出现在期货的实际应用(Clear memory in python loophttp://grokbase.com/t/python/python-list/1458ss5etz/real-world-use-of-concurrent-futures)之前 - 答案没有转化为我的用例(他们都关心超时等等)。

那么,您如何使用Concurrent期货而不会耗尽内存? (Python的3.5)

回答

2

我带你去一个镜头(可能是一个错误的猜测...)

您可能需要一点,因为每个提交你制作的副本提交您的作品位parser_variables可能最终咀嚼你的RAM。

这里是有趣的部分工作代码为 “< ----”

with futures.ProcessPoolExecutor(max_workers=6) as executor: 
    # A dictionary which will contain a list the future info in the key, and the filename in the value 
    jobs = {} 

    # Loop through the files, and run the parse function for each file, sending the file-name to it. 
    # The results of can come back in any order. 
    files_left = len(files_list) #<---- 
    files_iter = iter(files_list) #<------ 

    while files_left: 
     for this_file in files_iter: 
      job = executor.submit(parse_function, this_file, **parser_variables) 
      jobs[job] = this_file 
      if len(jobs) > MAX_JOBS_IN_QUEUE: 
       break #limit the job submission for now job 

     # Get the completed jobs whenever they are done 
     for job in futures.as_completed(jobs): 

      files_left -= 1 #one down - many to go... <--- 

      # Send the result of the file the job is based on (jobs[job]) and the job (job.result) 
      results_list = job.result() 
      this_file = jobs[job] 

      # delete the result from the dict as we don't need to store it. 
      del jobs[job] 

      # post-processing (putting the results into a database) 
      post_process(this_file, results_list) 
      break; #give a chance to add more jobs <----- 
+0

优秀的答案,谢谢。这很好地解决了它的峰值RAM使用率在140MB左右;我从来没有把投入视为问题(你说得对,他们也很大)。 (花费了20分钟后,你想知道为什么你不是真正的多处理,你会在'...'这行中过分缩小工作范围,所以这是'...'中this_file的子节点(更正现在)*注意Python设计师:关键语法的隐形字符不是一个好主意!* –

+0

感谢您的编辑,我很着急:) –