2017-04-10 78 views
0

我有一个arcpy进程需要在一堆图层上进行联合,运行一些计算并编写HTML报告。鉴于我需要生成的报告数量(约2,100个),我需要尽快完成此过程(我的目标是每个报告2秒)。当我遇到一个问题时,我尝试了很多方法来做到这一点,包括多处理,也就是说,无论使用多少个内核,运行多进程部分的时间本质上都需要相同的时间。多核处理时间线性增加

例如,对于相同数量的报告:

  • 2芯花〜30秒每轮(所以40个报告需要40/2 * 30秒)
  • 4芯花〜60秒( 40/4 * 60)
  • 10芯花〜160秒(40/10 * 160)

等。它的工作时间相同,因为每次翻动两倍的时间需要两次。

这是否意味着我的问题是I/O绑定,而不是CPU绑定?(如果是这样 - 我该怎么办?)我认为这是后者,因为我的时间的大瓶颈是工会(它占用了大约50%的处理时间)。在ArcGIS中,联盟通常很昂贵,所以我认为破解它并且一次运行2 - 10会快2到10倍。或者,可能我错误地实施了多进程?

## Worker function just included to give some context 

def worker(sub_code): 
    layer = 'in_memory/lyr_{}'.format(sub_code) 
    arcpy.Select_analysis(subbasinFC, layer, where_clause="SUB_CD = '{}'".format(sub_code)) 
    arcpy.env.extent = layer 
    union_name = 'in_memory/union_' + sub_code 

    arcpy.Union_analysis([fields], 
        union_name, 
        "NO_FID", "1 FEET") 
    #.......Some calculations using cursors 

    # Templating using Jinjah 
    context = {} 
    context['DATE'] = now.strftime("%B %d, %Y") 
    context['SUB_CD'] = sub_code 
    context['SUB_ACRES'] = sum([r[0] for r in arcpy.da.SearchCursor(union, ["ACRES"], where_clause="SUB_CD = '{}'".format(sub_code))]) 
    # Etc 

    # Then write the report out using custom function 
    write_html('template.html', 'output_folder', context) 


if __name__ == '__main__': 
    subList = sorted({r[0] for r in arcpy.da.SearchCursor(subbasinFC, ["SUB_CD"])}) 
    NUM_CORES = 7 
    chunk_list = [subList[i:i+NUM_CORES] for i in range(0, len(subList), NUM_CORES-1)] 
    for chunk in chunk_list: 
     jobs = [] 
     for subbasin in chunk: 
      p = multiprocessing.Process(target=worker, args=(subbasin,)) 
      jobs.append(p) 
      p.start() 

     for process in jobs: 
      process.join() 
+0

你的盒子有多少个核心? –

+0

8个物理16“逻辑处理器” – HFBrowning

回答

3

这里没有多少东西可以介绍,我对ArcGIS没有经验。所以我只能注意到两个更高层次的东西。首先,“通常的”方式来处理,这将是取代您NUM_CORES = 7下面所有的代码:

pool = multiprocessing.Pool(NUM_CORES) 
pool.map(worker, subList) 
pool.close() 
pool.join() 

map()负责保持所有的工作进程尽可能忙的。原来,你启动了7个进程,然后等待他们的全部完成。在最慢消失之前完成的所有进程,以及它们的核心处于空闲状态,等待下一个外部循环迭代。 A Pool可以让这7个流程在整个工作期间保持活跃状态​​,并在每完成一项工作后立即为每一项新工作提供帮助。

其次,这部分有逻辑错误结束:

chunk_list = [subList[i:i+NUM_CORES] for i in range(0, len(subList), NUM_CORES-1)] 

你想NUM_CORES存在,而不是NUM_CORES-1。作为-是,第一时间在你身边提取

subList[0:7] 

然后

subList[6:13] 

然后

subList[12:19] 

等。 subList[6]subList[12](等)每个提取两次。子列表重叠。

+0

+1,谢谢。我的问题(和实际情况)是一团糟,因为我的代码同时出现了太多的错误。接受是因为围绕“Process”玩弄我相信,等待所有进程完成的开销是造成奇怪减速的原因。使用'Pool'开始真的很强大 - 但我一直在避免它,因为它最终崩溃。我不认为我在删除记忆中的东西方面做得很好。当我知道所有细节时,我会回来更新我的问题 - 也许这样可能会更好地帮助别人。 – HFBrowning

+1

嗯。需要尝试的方法:将'maxtasksperchild = 1'添加到'Pool'构造函数,并将'chunksize = 1'添加到'map()'调用。这将强制为每个工作项目创建一个全新的流程。虽然这并不能“解决”任何事情,但在某些情况下,它可以有效地隐藏其他问题,至少可以取得进展;-) –

0

我不确定您是否正确使用Process池来跟踪您的工作。这:

for subbasin in chunk: 
    p = multiprocessing.Process(target=worker, args=(subbasin,)) 
    jobs.append(p) 
    p.start() 

    for process in jobs: 
     process.join() 

应改为:

for subbasin in chunk: 
    p = multiprocessing.Process(target=worker, args=(subbasin,)) 
    p.start() 
    p.join() 

有没有你要对spec of using the multiprocessing library具体原因是什么?在等待线程终止之前,您不会等待,直到它将创建一大堆父进程不处理的进程。

+0

如果OP按照您的建议进行操作,它们将保证不会有任何并行性:第二个循环启动一个进程,并在循环进入下一个进程之前等待它完成处理。它只是连续运行一个进程。 –

+0

@TimPeters:同意 - 根据您在响应中建议的内核数量,应该有另一个父循环。 OP的代码没有考虑到这一点,也没有遵循lib规范。 – tatlar

+0

我在OP的'Process'中没有看到任何错误。他们的代码同时触发'NUM_CORES'个进程,等待它们完成,然后循环获取下一个'NUM_CORES'个工作项。没关系。我只建议使用'map()',因为它更容易完成所有操作,并且以更有效的方式(保持所有工作进程尽可能繁忙)。什么 - 非常具体 - 你认为不符合规范?同时启动尽可能多的流程是绝对合理的(尽管在某些时候会变得适得其反)。 –

1

你不足以确定你在做什么。例如,你的env.workspace是什么? subbasinFC的值是多少?您似乎在每个过程开始时都要进行分析,将数据过滤为layer。但是subbasinFC来自磁盘或内存?如果它来自磁盘,我建议你在任何进程尝试过滤之前将所有内容都读入内存。如果你有足够的内存来支持它,这应该会加快速度。否则,是的,你对输入数据进行I/O绑定。

原谅我的arcpy无知,但为什么你在你的总和context['SUB_ACRES']插入一个where子句?您是不是已经在开始时过滤了sub_code? (我们不知道工会是什么,所以也许你会和未经过滤的东西结合......)

+0

+1,因为从磁盘观察中读取数据。你是对的 - 我将所有的读取操作从磁盘改为内存,并将基本工作器功能提高了3倍(连续运行需要90秒,现在需要30秒)。谢谢!这本身就是一个了不起的改进 – HFBrowning