这是我的工作人员:
def UpdateProcesses(start,processnumber,CachesThatRequireCalculating,CachesThatAreBeingCalculated,CacheDict,CacheLock,IdleLock,FileDictionary,MetaDataDict,CacheIndexDict):
NewPool()
while start[processnumber]:
IdleLock.wait()
while len(CachesThatRequireCalculating)>0 and start[processnumber] == True:
CacheLock.acquire()
try:
cacheCode = CachesThatRequireCalculating[0] # The list can be empty if an other process takes the last item during the CacheLock
CachesThatRequireCalculating.remove(cacheCode)
print cacheCode,"starts processing by",processnumber,"process"
except:
CacheLock.release()
else:
CacheLock.release()
CachesThatAreBeingCalculated.append(cacheCode[:3])
Array,b,f = TIPP.LoadArray(FileDictionary[cacheCode[:2]])#opens the dask array
Array = ((Array[:,:,CacheIndexDict[cacheCode[:2]][cacheCode[2]]:CacheIndexDict[cacheCode[:2]][cacheCode[2]+1]].compute()/2.**(MetaDataDict[cacheCode[:2]]["Bit Depth"])*255.).astype(np.uint16)).transpose([1,0,2]) #slices and calculates the array
f.close() #close the file
if CachesThatAreBeingCalculated.count(cacheCode[:3]) != 0: #if not, this cache is not needed annymore (the cacheCode is removed bij wavelengthchange)
CachesThatAreBeingCalculated.remove(cacheCode[:3])
try: #If the first time the object if not aivalable try a second time
CacheDict[cacheCode[:3]] = Array
except:
CacheDict[cacheCode[:3]] = Array
print cacheCode,"done processing by",processnumber,"process"
if start[processnumber]:
IdleLock.clear()
这是我如何开始他们:
self.ProcessLst = [] #list with all the processes who calculate the caches
for processnumber in range(min(NumberOfMaxProcess,self.processes)):
self.ProcessTerminateLst.append(True)
for processnumber in range(min(NumberOfMaxProcess,self.processes)):
self.ProcessLst.append(process.Process(target=Proc.UpdateProcesses,args=(self.ProcessTerminateLst,processnumber,self.CachesThatRequireCalculating,self.CachesThatAreBeingCalculated,self.CacheDict,self.CacheLock,self.IdleLock,self.FileDictionary,self.MetaDataDict,self.CacheIndexDict,)))
self.ProcessLst[-1].daemon = True
self.ProcessLst[-1].start()
我关闭它们是这样的:
for i in range(len(self.ProcessLst)): #For both while loops in the processes self.ProcessTerminateLst[i] must be True. So or the process is now ready to be terminad or is still in idle mode.
self.ProcessTerminateLst[i] = False
self.IdleLock.set() #Makes sure no process is in Idle and all are ready to be terminated
我可以在我的作业使用睡眠并减少或增加该值以便拥有更多或更少的作业,但是我想要的是更改并发线程的数量(因为每个线程都是ac oncurrent连接,我正在写一个负载模拟器的服务器,我需要模拟热身的缓存),所以这些技巧将无法在我的情况下工作 –
我有一个类似的情况下,我用管理器来管理之间进程和我的线程中有while循环。所以我只是启动了x个线程,如果我想关闭它们,我在while循环中将该变量设置为True,并且线程退出。该变量位于管理器列表中,因此您可以在主线程中对其进行修改。我可以告诉你我的代码给你一个想法,我是如何解决它的,但我是这个论坛的新手,不知道如何。 –
嗯...看起来像只能减少,不会增加... 和我使用全局变量,因此我可以在线程之间共享状态^^ –