2

我有大量的文件要处理。我写了一个脚本来获取,分类和绘制我想要的数据。到现在为止还挺好。我已经测试过它,并提供了预期的结果。matplotlib和python多线程/多处理文件处理

然后我想用多线程来做到这一点。我查看了互联网上的文档和示例,并在我的程序中使用一个线程正常工作。但是当我使用更多的时候,在某些时候,我得到了随机的matplotlib错误,并且我怀疑存在一些冲突,尽管我使用了一个带有剧情名称的函数,并且iI无法看到问题出在哪里。

这是整个脚本,如果您需要更多评论,我会添加它们。谢谢。

#!/usr/bin/python 
import matplotlib 
matplotlib.use('GTKAgg') 
import numpy as np 
from scipy.interpolate import griddata 

import matplotlib.pyplot as plt 
import matplotlib.colors as mcl 
from matplotlib import rC#for latex 

import time as tm 
import sys 
import threading 
import Queue #queue in 3.2 and Queue in 2.7 ! 

import pdb #the debugger 

rc('text', usetex=True)#for latex 

map=0 #initialize the map index. It will be use to index the array like  this: array[map,[x,y]] 
time=np.zeros(1) #an array to store the time 
middle_h=np.zeros((0,3)) #x phi c 

#for the middle of the box 
current_file=open("single_void_cyl_periodic_phi_c_middle_h_out",'r') 
for line in current_file: 
    if line.startswith('# === time'): 
     map+=1 
     np.append(time,[float(line.strip('# === time '))]) 
    elif line.startswith('#'): 
     pass 
    else: 
     v=np.fromstring(line,dtype=float,sep=' ') 
     middle_h=np.vstack((middle_h,v[[1,3,4]])) 
current_file.close() 
middle_h=middle_h.reshape((map,-1,3)) #3d array: map, x, phi,c 

##### 
def load_and_plot(): #will load a map file, and plot it along with the  corresponding profile loaded before 
    while not exit_flag: 
     print("fecthing work ...") 
     #try: 
     if not tasks_queue.empty(): 
      map_index=tasks_queue.get() 
      print("----> working on map: %s" %map_index) 
      x,y,zp=np.loadtxt("single_void_cyl_growth_periodic_post_map_"+str(map_index),unpack=True, usecols=[1, 2,3]) 
      for i,el in enumerate(zp): 
       if el<0.: 
        zp[i]=0. 
      xv=np.unique(x) 
      yv=np.unique(y) 
      X,Y= np.meshgrid(xv,yv) 
      Z = griddata((x, y), zp, (X, Y),method='nearest') 

      figure=plt.figure(num=map_index,figsize=(14, 8)) 
      ax1=plt.subplot2grid((2,2),(0,0)) 
      ax1.plot(middle_h[map_index,:,0],middle_h[map_index,:,1],'*b') 
      ax1.grid(True) 
      ax1.axis([-15, 15, 0, 1]) 
      ax1.set_title('Profiles') 
      ax1.set_ylabel(r'$\phi$') 
      ax1.set_xlabel('x') 

      ax2=plt.subplot2grid((2,2),(1,0)) 
      ax2.plot(middle_h[map_index,:,0],middle_h[map_index,:,2],'*r') 
      ax2.grid(True) 
      ax2.axis([-15, 15, 0, 1]) 
      ax2.set_ylabel('c') 
      ax2.set_xlabel('x') 

      ax3=plt.subplot2grid((2,2),(0,1),rowspan=2,aspect='equal') 
      sub_contour=ax3.contourf(X,Y,Z,np.linspace(0,1,11),vmin=0.) 
      figure.colorbar(sub_contour,ax=ax3) 
      figure.savefig('single_void_cyl_'+str(map_index)+'.png') 
      plt.close(map_index) 
      tasks_queue.task_done() 
     else: 
      print("nothing left to do, other threads finishing,sleeping 2 seconds...") 
      tm.sleep(2) 
    # except: 
    #  print("failed this time: %s" %map_index+". Sleeping 2 seconds") 
    #  tm.sleep(2) 
##### 
exit_flag=0 
nb_threads=2 
tasks_queue=Queue.Queue() 
threads_list=[] 

jobs=list(range(map)) #each job is composed of a map 
print("inserting jobs in the queue...") 
for job in jobs: 
    tasks_queue.put(job) 
print("done") 

#launch the threads 
for i in range(nb_threads): 
    working_bee=threading.Thread(target=load_and_plot) 
    working_bee.daemon=True 
    print("starting thread "+str(i)+' ...') 
    threads_list.append(working_bee) 
working_bee.start() 


#wait for all tasks to be treated 
tasks_queue.join() 

#flip the flag, so the threads know it's time to stop 
exit_flag=1 

for t in threads_list: 
    print("waiting for threads %s to stop..."%t) 
    t.join() 

print("all threads stopped") 
+1

我会推荐使用'multiprocessing'而不是线程。我成功地使用它来实现平行的数字绘图。 –

+0

谢谢,实现起来似乎比较复杂,但我会试一试。 – Napseis

+0

你只开始最后一个线程;在循环 – jfs

回答

3

继大卫的建议下,我做了它在多。 8个处理器的速度提升了5倍。我相信剩下的事情在我的脚本开始时对单一流程的工作是做的。 编辑:但是有时脚本“挂起”在最后的地图,即使它产生正确的地图,并显示以下错误:

文件“single_void_cyl_plot_mprocess.py”,行90,在tasks_queue.join()

文件 “/usr/local/epd-7.0-2-rh5-x86_64/lib/python2.7/multiprocessing/queues.py”,线路316,在加入self._cond.wait()

文件“ /usr/local/epd-7.0-2-rh5-x86_64/lib/python2.7/multiprocessing/synchronize.py“,第220行,等待self._wait_semaphore.acquire(True,timeout)

import numpy as np 
from scipy.interpolate import griddata 

import matplotlib.pyplot as plt 
from matplotlib import rC#for latex 

from multiprocessing import Process, JoinableQueue 

import pdb #the debugger 

rc('text', usetex=True)#for latex 

map=0 #initialize the map index. It will be use to index the array  like this: array[map,x,y,...] 
time=np.zeros(1) #an array to store the time 
middle_h=np.zeros((0,3)) #x phi c 

#for the middle of the box 
current_file=open("single_void_cyl_periodic_phi_c_middle_h_out",'r') 
for line in current_file.readlines(): 
    if line.startswith('# === time'): 
     map+=1 
     np.append(time,[float(line.strip('# === time '))]) 
    elif line.startswith('#'): 
     pass 
    else: 
     v=np.fromstring(line,dtype=float,sep=' ') 
     middle_h=np.vstack((middle_h,v[[1,3,4]])) 
current_file.close() 
middle_h=middle_h.reshape((map,-1,3)) #3d array: map, x, phi,c 

####### 
def load_and_plot(): #will load a map file, and plot it along with  the corresponding profile loaded before 
    while tasks_queue.empty()==False: 
     print("fecthing work ...") 
     try: 
      map_index=tasks_queue.get() #get some work to do from  the queue 
      print("----> working on map: %s" %map_index) 
       x,y,zp=np.loadtxt("single_void_cyl_growth_periodic_post_map_"+str(map_index),\ 
       unpack=True, usecols=[1, 2,3]) 
      for i,el in enumerate(zp): 
       if el<0.: 
        zp[i]=0. 
      xv=np.unique(x) 
      yv=np.unique(y) 
      X,Y= np.meshgrid(xv,yv) 
      Z = griddata((x, y), zp, (X, Y),method='nearest') 

      figure=plt.figure(num=map_index,figsize=(14, 8)) 
      ax1=plt.subplot2grid((2,2),(0,0)) 
       ax1.plot(middle_h[map_index,:,0],middle_h[map_index,:,1],'*b') 
      ax1.grid(True) 
      ax1.axis([-15, 15, 0, 1]) 
      ax1.set_title('Profiles') 
      ax1.set_ylabel(r'$\phi$') 
      ax1.set_xlabel('x') 

      ax2=plt.subplot2grid((2,2),(1,0)) 
       ax2.plot(middle_h[map_index,:,0],middle_h[map_index,:,2],'*r') 
      ax2.grid(True) 
      ax2.axis([-15, 15, 0, 1]) 
      ax2.set_ylabel('c') 
      ax2.set_xlabel('x') 

      ax3=plt.subplot2grid((2,2), (0,1),rowspan=2,aspect='equal') 
       sub_contour=ax3.contourf(X,Y,Z,np.linspace(0,1,11),vmin=0.) 
      figure.colorbar(sub_contour,ax=ax3) 
       figure.savefig('single_void_cyl_'+str(map_index)+'.png') 
      plt.close(map_index) 
      tasks_queue.task_done() #work for this item finished 
     except: 
      print("failed this time: %s" %map_index) 
####### 

nb_proc=8 #number of processes 
tasks_queue=JoinableQueue() #a queue to pile up the work to do 

jobs=list(range(map)) #each job is composed of a map 
print("inserting jobs in the queue...") 
for job in jobs: 
    tasks_queue.put(job) 
print("done") 

#launch the processes 
for i in range(nb_proc): 
    current_process=Process(target=load_and_plot) 
    current_process.start() 

#wait for all tasks to be treated 
tasks_queue.join() 
+0

你也可以尝试'pool = multiprocessing.Pool()''for result in pool.imap_unordered(process_job,jobs):pass' – jfs

+0

I可能会尝试,似乎在加入某个地方有一个错误...它不会每次都发生,但它使我困惑。 – Napseis