2012-09-26 99 views
1

我需要实现一个系统,其中包含一个主进程,用于管理执行其他任务的从进程。我有两种不同的从机类型,每个从机需要6个实例。我写了一些可以工作的东西,但是它会杀死每个进程,并在任务完成时启动一个新进程。这是不可取的,因为产生新的过程是昂贵的。我希望保持每个从站作为一个进程运行,并在完成时收到通知,并用新的输入重新运行。在Python中管理固定数量的工作人员

我目前的伪代码如下。它并不完美;因为我没有与我的实际代码,所以我正在进行讨论。

# SlaveTypeB is pretty much the same. 
class SlaveTypeA(multiprocessing.Process): 
    def __init__(self, val): 
     self.value = val 
     self.result = multiprocessing.Queue(1) 
     self.start() 
    def run(self): 
     # In real life, run does something that takes a few seconds. 
     sleep(2) 
     # For SlaveTypeB, assume it writes self.val to a file instead of incrementing 
     self.result.put(self.val + 1) 
    def getResult(self): 
     return self.result.get()[0] 


if __name__ == "__main__": 
    MAX_PROCESSES = 6 
    # In real life, the input will grow as the while loop is being processed 
    input = [1, 4, 5, 6, 9, 6, 3, 3] 
    aProcessed = [] 
    aSlaves = [] 
    bSlaves = [] 

    while len(input) > 0 or len(aProcessed) > 0: 
     if len(aSlaves) < MAX_PROCESSES and len(input) > 0: 
      aSlaves.append(SlaveTypeA(input.pop(0)) 
     if len(bSlaves) < MAX_PROCESSES and len(aProcessed) > 0 : 
      bSlaves.append(SlaveTypeB(aProcesssed.pop(0)) 
     for aSlave in aSlaves: 
      if not aSlave.isAlive(): 
       aProcessed = aSlave.getResult() 
       aSlaves.remove(aSlave) 
     for bSlave in bSlaves: 
      if not bSlave.isAlive(): 
       bSlaves.remove(bSlave) 

我该怎么做才能让slaves和bSlaves中的进程不会被杀死并重新生成。我在想我可以使用管道,但是我不知道如何知道流程何时完成,而无需等待。

编辑 我重写了这个使用管道,它解决了我的问题,无法保持进程运行。仍然希望以最好的方式进行输入。我忽略了slaveB部分,因为只有一个工人类型可以简化问题。

class Slave(Process) 
    def __init__(self, id): 
     # Call super init, set id, set idlestate = true, etc 
     self.parentCon, self.childCon = Pipe() 
     self.start() 

    def run(self): 
     while True: 
      input = self.childCon.recv() 
      # Do something here in real life 
      sleep(2) 
      self.childCon.send(input + 1) 

    #def isIdle/setIdle(): 
     # Getter/setter for idle 

    def tryGetResult(self):    
     if self.parentCon.poll(): 
      return self.parentCon.recv() 
     return False 

    def process(self, input): 
     self.parentConnection.send(input) 

if __name__ == '__main__' 
    MAX_PROCESSES = 6 
    jobs = [1, 4, 5, 6, 9, 6, 3, 3] 
    slaves = [] 
    for int i in range(MAX_PROCESSES): 
     slaves.append(Slave(i)) 
    while len(jobs) > 0: 
     for slave in slaves: 
      result = slave.tryGetResult() 
      if result: 
       # Do something with result 
       slave.setIdle(True) 
      if slave.isIdle(): 
       slave.process(jobs.pop()) 
       slave.setIdle(False) 

EDIT 2 得到它,请参阅下面回答。

+1

你看着'multiprocessing.Pool'? http://docs.python.org/library/multiprocessing.html#using-a-pool-of-workers – tMC

+0

我不认为有可能让池进程退出并重用它们,是吗? –

回答

0

看起来像使用SyncManager是最好的选择了这种情况。

类硕士(SyncManager): 通

输入= [1,4,5,6,9,6,6,3,3] DEF getNextInput(): #检查输入ISN”吨空第一 返回input.pop()

如果名称 == “ ”: MAX_PROCESSES = 6 Master.register(“ getNextInput”,getNextInput) 米=主(( '本地主机' ,5000)) m.start() 为我的range(MAX_PROCESSES): 从() 而真: 通

class Slave(Process): 
    def __init__(self): 
     multiprocessing.Process.__init__(self) 
     self.start() 
    def run(self): 
     Master.register("getNextInput", getNextInput) 
     m = Master(('localhost', 5000)) 
     m.connect() 
     while True: 
      input = m.getNextInput() 
      # Check for None first 
      self.process(input) 
    def process(self): 
     print "Processed " + str(input) 
0

创建两个队列? 像worktodoAworktodoB,让你的工人闲置,而他们等待某些东西被放入队列中,如果放在那里的东西让我们说'退出',他们会退出?

否则,你应该给tMCs comment一个镜头

+0

这就是我现在正在做的事情。问题是我需要知道工人何时完成。 –

+0

另一个队列呢?像“闲暇工作者”可能有点笨拙... – Willy

相关问题