2015-04-20 48 views
2

我意识到我可以使用Pool类,并可能得到我需要的东西,但我希望对我的问题有一个更好的控制。我有更多的工作比我做处理器,所以我不希望他们一次全部运行。Python - 多处理 - 更多的工作,cpus。

例如:

from multiprocessing import Process,cpu_count 
for dir_name in directories: 
    src_dir = os.path.join(top_level,dir_name) 
    dst_dir = src_dir.replace(args.src_dir,args.target_dir) 
    p = Process(target=transfer_directory, args=(src_dir, dst_dir,)) 
    p.start() 

但是,如果我有16个以上的目录,然后我会开始更多的就业机会比我处理器。这是我的解决方案,真是黑客。

from multiprocessing import Process,cpu_count 
jobs = [] 
for dir_name in directories: 
    src_dir = os.path.join(top_level,dir_name) 
    dst_dir = src_dir.replace(args.src_dir,args.target_dir) 
    p = Process(target=transfer_directory, args=(src_dir, dst_dir,)) 
    jobs.append(p) 

alive_jobs = [] 
while jobs: 
    if len(alive_jobs) >= cpu_count(): 
     time.sleep(5) 
     print alive_jobs 
     for aj in alive_jobs: 
      if aj.is_alive(): 
       continue 
      else: 
       print "job {} removed".format(aj) 
       alive_jobs.remove(aj) 

     continue 

    for job in jobs: 
     if job.is_alive(): 
      continue 
     job.start() 
     alive_jobs.append(job) 
     print alive_jobs 
     jobs.remove(job) 
     if len(alive_jobs) >= cpu_count(): 
      break 

是否有更好的解决方案使用内置工具?

+0

这会给你更好的控制吗? –

+0

那么如果有人有一个解决方案,能够键盘中断池模块没有冻结,并不得不关闭终端 – jwillis0720

+3

相关:[键盘中断与python的多处理池](http://stackoverflow.com/questions/1408356/键盘中断与 - 蟒蛇的多重处理池)。 –

回答

2

我想在这里分享我的想法:创建的进程数等于CPU_COUNT(),使用队列存储所有的目录,并通过队列到您的transfer_directory方法,需要dir_name出从队列一旦过程完成后其工作。看起来像这样的草案:

NUM_OF_PROCESSES = multiprocessing.cpu_count() 
TIME_OUT_IN_SECONDS = 60 

for dir_name in directories: 
    my_queue.put(dir_name) 

# creates processes that equals to number of CPU 
processes = [multiprocessing.Process(target=transfer_directory, args=(my_queue,)) for x in range(NUM_OF_PROCESSES)] 

# starts processes 
for p in processes: 
    p.start() 

# blocks the calling thread 
for p in processes: 
    p.join() 



def transfer_directory(my_queue): 
    """processes element of directory queue if queue is not empty""" 
    while my_queue is not empty: 
     dir_name = my_queue.get(timeout=TIME_OUT_IN_SECONDS) 
     src_dir = os.path.join(top_level,dir_name) 
     dst_dir = src_dir.replace(args.src_dir,args.target_dir) 

编辑: 它也可以有效地用于读取大文件。 我一直在努力如何使用multiprocessing一段时间来读取一个巨大的文件(它超过1000万行),最后我使用单个进程启动producer(a_queue),刚刚读取并将行放入队列中,然后启动多个consumers(a_queue)a_queue采取线路,并做耗时的工作,它适用于我。