2015-02-06 51 views
8

我有一个函数执行一些模拟,并且 返回一个字符串格式的数组。Python多处理 - 跟踪pool.map操作的过程

我想运行 变化的输入参数值,超过10000个可能的输入值, 的仿真(功能),并将结果写入单个文件。

我正在使用多处理,特别是pool.map函数 并行运行模拟。

由于运行仿真功能10000次以上的整个过程需要很长时间,我真的很想跟踪整个操作过程。

我认为我当前代码中的问题是,pool.map运行10000次函数,在这些操作过程中没有任何进程跟踪。一旦并行处理完成10000次仿真(可能需要数小时至数天),那么当10000个仿真结果保存到文件时,我会继续跟踪。因此,这并不是真正跟踪pool.map操作的处理。

是否有一个容易修复我的代码,将允许进程跟踪?

def simFunction(input): 
    # Does some simulation and outputs simResult 
    return str(simResult) 

# Parallel processing 

inputs = np.arange(0,10000,1) 

if __name__ == "__main__": 
    numCores = multiprocessing.cpu_count() 
    pool = multiprocessing.Pool(processes = numCores) 
    t = pool.map(simFunction, inputs) 
    with open('results.txt','w') as out: 
     print("Starting to simulate " + str(len(inputs)) + " input values...") 
     counter = 0 
     for i in t: 
      out.write(i + '\n') 
      counter = counter + 1 
      if counter%100==0: 
       print(str(counter) + " of " + str(len(inputs)) + " input values simulated") 
    print('Finished!!!!') 

回答

7

如果使用迭代的map函数,可以很容易地跟踪进度。您也可以使用异步map。在这里,我会做一些不同的事情,只是混淆。

>>> import time 
>>> res = Pool().amap(simFunction, x,y) 
>>> while not res.ready(): 
... print "waiting..." 
... time.sleep(5) 
... 
waiting... 
waiting... 
waiting... 
waiting... 
>>> res.get() 
[-100, -97, -92, -85, -76, -65, -52, -37, -20, -1, 20, 43, 68, 95, 124, 155, 188, 223, 260, 299, 340, 383, 428, 475, 524, 575, 628, 683, 740, 799, 860, 923, 988, 1055, 1124, 1195, 1268, 1343, 1420, 1499, 1580, 1663, 1748, 1835, 1924, 2015, 2108, 2203, 2300, 2399, 2500, 2603, 2708, 2815, 2924, 3035, 3148, 3263, 3380, 3499, 3620, 3743, 3868, 3995, 4124, 4255, 4388, 4523, 4660, 4799, 4940, 5083, 5228, 5375, 5524, 5675, 5828, 5983, 6140, 6299, 6460, 6623, 6788, 6955, 7124, 7295, 7468, 7643, 7820, 7999, 8180, 8363, 8548, 8735, 8924, 9115, 9308, 9503, 9700, 9899] 

请注意,我使用pathos.multiprocessing代替multiprocessing。这只是multiprocessing的一个分支,它使您能够利用多个输入来执行map函数,具有更好的序列化,并允许您在任何地方(而不仅仅是在__main__)执行map调用。您也可以使用multiprocessing来完成上述操作,但代码会略有不同。

迭代或异步map将使您能够编写任何代码,以便执行更好的过程跟踪。例如,将一个唯一的“id”传递给每个作业,并观察哪些回来,或者让每个作业返回它的进程ID。有很多方法可以跟踪进度和流程...但上面的内容应该会为您提供一个开始。

你可以pathos这里:https://github.com/uqfoundation

+0

非常感谢你! – user32147 2015-02-26 17:08:45

3

没有“简单修复”。 map是关于隐藏你的实现细节。在这种情况下,你想要细节。就是说,根据定义,事情变得更加复杂一些。你需要改变通信范式。有很多方法可以这样做。

一个是:创建一个队列来收集您的结果,并让您的工作人员将结果放入此队列中。然后,您可以在监视线程或进程内查看队列,并在进入时使用结果。在使用时,您可以分析它们并生成日志输出。这可能是跟踪进度的最常用方式:您可以实时以任何方式响应传入结果。

更简单的方法可能是稍微修改您的辅助函数,并在那里生成日志输出。通过使用外部工具仔细分析日志输出(例如grepwc),您可以想出很简单的方法来跟踪。

+1

谢谢。你能提供一些简单的例子吗? – user32147 2015-02-06 22:53:50

3

我想你需要的是一个日志文件

我建议您使用日志记录模块,它是Python标准库的一部分。但不幸的是日志记录不是多处理安全的。所以你不能在你的应用程序中使用它。

因此,您将需要使用多处理安全的日志处理程序,或者使用Queue或实现您的模块或日志模块。

在Stackoverflow中有很多关于此的讨论。这比如:How should I log while using multiprocessing in Python?

如果大多数CPU的负载是在模拟功能,你不打算使用日志轮换,你也许可以用一个简单的锁定机制是这样的:

import multiprocessing 
import logging 

from random import random 
import time 


logging.basicConfig(
    level=logging.DEBUG, 
    format='%(asctime)s %(process)s %(levelname)s %(message)s', 
    filename='results.log', 
    filemode='a' 
) 


def simulation(a): 
    # logging 
    with multiprocessing.Lock(): 
     logging.debug("Simulating with %s" % a) 

    # simulation 
    time.sleep(random()) 
    result = a*2 

    # logging 
    with multiprocessing.Lock(): 
     logging.debug("Finished simulation with %s. Result is %s" % (a, result)) 

    return result 

if __name__ == '__main__': 

    logging.debug("Starting the simulation") 
    inputs = [x for x in xrange(100)] 
    num_cores = multiprocessing.cpu_count() 
    print "num_cores: %d" % num_cores 
    pool = multiprocessing.Pool(processes=num_cores) 
    t = pool.map(simulation, inputs) 
    logging.debug("The simulation has ended") 

你可以运行时“tail -f”您的日志文件。这是你应该看到的:

2015-02-08 18:10:00,616 3957 DEBUG Starting the simulation 
2015-02-08 18:10:00,819 3961 DEBUG Simulating with 12 
2015-02-08 18:10:00,861 3965 DEBUG Simulating with 28 
2015-02-08 18:10:00,843 3960 DEBUG Simulating with 20 
2015-02-08 18:10:00,832 3959 DEBUG Simulating with 16 
2015-02-08 18:10:00,812 3958 DEBUG Simulating with 8 
2015-02-08 18:10:00,798 3963 DEBUG Simulating with 4 
2015-02-08 18:10:00,855 3964 DEBUG Simulating with 24 
2015-02-08 18:10:00,781 3962 DEBUG Simulating with 0 
2015-02-08 18:10:00,981 3961 DEBUG Finished simulation with 12. Result is 24 
2015-02-08 18:10:00,981 3961 DEBUG Simulating with 13 
2015-02-08 18:10:00,991 3958 DEBUG Finished simulation with 8. Result is 16 
2015-02-08 18:10:00,991 3958 DEBUG Simulating with 9 
2015-02-08 18:10:01,130 3964 DEBUG Finished simulation with 24. Result is 48 
2015-02-08 18:10:01,131 3964 DEBUG Simulating with 25 
2015-02-08 18:10:01,134 3964 DEBUG Finished simulation with 25. Result is 50 
2015-02-08 18:10:01,134 3964 DEBUG Simulating with 26 
2015-02-08 18:10:01,315 3961 DEBUG Finished simulation with 13. Result is 26 
2015-02-08 18:10:01,316 3961 DEBUG Simulating with 14 
2015-02-08 18:10:01,391 3961 DEBUG Finished simulation with 14. Result is 28 
2015-02-08 18:10:01,391 3961 DEBUG Simulating with 15 
2015-02-08 18:10:01,392 3963 DEBUG Finished simulation with 4. Result is 8 
2015-02-08 18:10:01,393 3963 DEBUG Simulating with 5 

在Windows和Linux上试过。

希望这会有所帮助

+0

'multiprocessing.get_logger()'返回受锁保护的特性受限记录器,请参阅https://docs.python.org/2/library/multiprocessing.html#logging – 2015-02-09 00:07:36

+0

是的,但是这是模块记录器...所以你可以使用它,你的日志将与模块级消息混合在一起:尝试它,你会看到这样的消息:2015-02-08 23:47:10,954 9288 DEBUG创建带句柄的semlock 448 – 2015-02-09 02:48:34

+0

哦,你对,我从来没有真正使用过它,并且太快地浏览了文档。 – 2015-02-09 13:04:49