2017-03-16 35 views
1

我想从Python为大约8000个文件启动外部命令。每个文件都是独立处理的。唯一的限制是所有文件都被处理后继续执行。我有4个物理核心,每个核心有2个逻辑核心(multiprocessing.cpu_count()返回8)。我的想法是使用四个并行独立进程池,这些进程将在8个内核中的4个内核上运行。这样我的机器在此期间应该可以使用。使用外部命令多处理数千个文件

这是我一直在做的事情:

import multiprocessing 
import subprocess 
import os 
from multiprocessing.pool import ThreadPool 


def process_files(input_dir, output_dir, option): 
    pool = ThreadPool(multiprocessing.cpu_count()/2) 
    for filename in os.listdir(input_dir): # about 8000 files 
     f_in = os.path.join(input_dir, filename) 
     f_out = os.path.join(output_dir, filename) 
     cmd = ['molconvert', option, f_in, '-o', f_out] 
     pool.apply_async(subprocess.Popen, (cmd,)) 
    pool.close() 
    pool.join() 


def main(): 
    process_files('dir1', 'dir2', 'mol:H') 
    do_some_stuff('dir2') 
    process_files('dir2', 'dir3', 'mol:a') 
    do_more_stuff('dir3') 

一个连续的治疗需要120秒为一个批次的100个文件。上面概述的多处理版本(功能process_files)仅需20秒。但是,当我在整个8000个文件集上运行process_files时,我的PC挂起并在一小时后不冻结。

我的问题是:

1)我想ThreadPool应该初始化进程池(这里multiprocessing.cpu_count()/2过程,要准确)。然而,我的计算机挂在8000个文件上,但不是100个,这表明可能没有考虑池的大小。要么,要么我做错了什么。你能解释一下吗?

2)这是在Python中启动独立进程时的正确方法,当它们每个都必须启动一个外部命令时,并且这样所有资源都不会被处理占用?

+0

我比较@larsks('ThreadPool'和'apply_async'和subprocess'call's)和@Roland Smith(使用'Popen'对象的手动池管理)提出的解决方案。我的基准测试表明'ThreadPool'解决方案在实践中速度更快。非常感谢你们! – user3638629

回答

1

我觉得你的基本问题是使用subprocess.Popen。该方法不会而不是在返回之前等待命令完成。由于该函数立即返回(即使该命令仍在运行),只要您的ThreadPool有关,该函数就完成了,并且它可以产生另一个...这意味着您最终会产生8000个左右的进程。

你会使用subprocess.check_call可能有更好的运气:

Run command with arguments. Wait for command to complete. If 
the exit code was zero then return, otherwise raise 
CalledProcessError. The CalledProcessError object will have the 
return code in the returncode attribute. 

所以:

def process_files(input_dir, output_dir, option): 
    pool = ThreadPool(multiprocessing.cpu_count()/2) 
    for filename in os.listdir(input_dir): # about 8000 files 
     f_in = os.path.join(input_dir, filename) 
     f_out = os.path.join(output_dir, filename) 
     cmd = ['molconvert', option, f_in, '-o', f_out] 
     pool.apply_async(subprocess.check_call, (cmd,)) 
    pool.close() 
    pool.join() 

如果你真的不关心的退出代码,那么你可能想subprocess.call,不会在进程中出现非零退出代码时引发异常。

+0

谢谢你对这个非常清楚和直接的事实解释。事实上'subprocess.Popen'必须是导致这么多进程产生的原因。我没有使用'subprocess.call',认为Python会等待完成这个过程,而不是用有用的工作者来填充这个池。但这就是为什么游泳池是在那里的原因。 (对不起,代表太低,无法上传。) – user3638629

+0

您仍然可以通过单击此答案左侧的复选标记将其标记为“已接受”答案。 – larsks

+0

是的,我知道。麻烦的是,我在决定两个非常有用的答案(我目前正在根据两个提出的解决方案测试结果)之间做出决定时很困难。 :d – user3638629

1

如果您使用的是Python 3,我会考虑使用map方法concurrent.futures.ThreadPoolExecutor

或者,您可以自己管理子流程列表。

以下示例定义了一个函数来启动ffmpeg将视频文件转换为Theora/Vorbis格式。它为每个启动的子进程返回一个Popen对象。

def startencoder(iname, oname, offs=None): 
    args = ['ffmpeg'] 
    if offs is not None and offs > 0: 
     args += ['-ss', str(offs)] 
    args += ['-i', iname, '-c:v', 'libtheora', '-q:v', '6', '-c:a', 
      'libvorbis', '-q:a', '3', '-sn', oname] 
    with open(os.devnull, 'w') as bb: 
     p = subprocess.Popen(args, stdout=bb, stderr=bb) 
    return p 

在主程序中,表示运行的子进程Popen对象的列表被保持这个样子。

outbase = tempname() 
ogvlist = [] 
procs = [] 
maxprocs = cpu_count() 
for n, ifile in enumerate(argv): 
    # Wait while the list of processes is full. 
    while len(procs) == maxprocs: 
     manageprocs(procs) 
    # Add a new process 
    ogvname = outbase + '-{:03d}.ogv'.format(n + 1) 
    procs.append(startencoder(ifile, ogvname, offset)) 
    ogvlist.append(ogvname) 
# All jobs have been submitted, wail for them to finish. 
while len(procs) > 0: 
    manageprocs(procs) 

因此,只有当运行子进程少于核心时才启动新进程。多次使用的代码分为manageprocs函数。

def manageprocs(proclist): 
    for pr in proclist: 
     if pr.poll() is not None: 
      proclist.remove(pr) 
    sleep(0.5) 

sleep的调用用于防止程序在循环中旋转。

+0

感谢您提到'concurrent.futures.ThreadPoolExecutor'(这里仍然使用Python 2.7)。感谢您提供手动池管理的好例子。我试图做类似的事情(考虑到迭代它时我不应该在列表中移除),但一定是出了问题。我将很快测试这个解决方案。 (对不起,代表太低,不能upvote。) – user3638629

+0

我比较了两种方法(你的答案和@larsks')。我非常喜欢这个解决方案,但似乎手动管理池会导致开销,可能是因为对“睡眠”的调用(我让进程管理器睡了0.2秒,因为它看起来更合适)。在批量测试中,我的真实输入大小的1/10,手动池管理比'cpu_count() - 1'内核上的ThreadPool慢8%,比'cpu_count()/ 2'上的'ThreadPool'慢27%核心。 – user3638629

+0

你必须做一些真正的分析,看看差异来自哪里。有很多因素会影响事物。例如,'cpu_count()'是子进程的最优*数量。您应该尝试从'cpu_count()/ 2'到'cpu_count()* 2'的范围内的任何内容。此外,您应该根据“molconvert”通常需要的时间调整“睡眠”的数量。但是,由于我已经完全切换到Python 3,我现在倾向于使用'concurrent.futures.ThreadPoolExecutor'这样的东西。 –

相关问题