2011-12-16 38 views
3

我的情况是这样的:恢复丢失的物品multiprocessing.Queue当工人进程死亡

  • 我有工作人员说,如果是空的,其任务排入到multiprocessing.Queue()。这是为了确保任务的执行遵循一定的优先级和多处理.Queue()不会执行优先级。
  • 有许多工作人员从mp.Queue弹出并做一些事情。有时候(< 0.1%)这些失败和死亡没有重新入队的可能性。
  • 我的任务通过中央数据库锁定,可能只运行一次(硬性要求)。为此,他们有一些他们可以从/转换到的状态。

我目前的解决方案:让所有的工人,通过该任务已经完成,并介绍由一个任务必须完成的最后期限另一个队列回答。如果已达到最后期限,请重置任务并重新入队。这具有解决方案是“软”的问题,即截止日期是任意的。

我正在寻找最简单的解决方案。有没有更简单或更严格的解决方案?

+0

您可以更改工作人员,以便在任务失败时,工作人员自行重新整理任务? – unutbu 2011-12-16 10:34:38

+0

任务失败时会发生什么?你可以捕捉到一个Python异常? – 2011-12-16 12:53:57

回答

3

该解决方案使用三个队列,以保持工作(模拟为WORK_ID)的轨迹:

  • todo_q:任何要完成的工作(包括如果该进程在飞行中死亡,要重做)
  • start_q:已启动一个进程
  • finish_q任何工作:已完成任何工作

我们这种方法你不应该需要一个计时器。只要您分配进程标识符并跟踪分配,请检查是否Process.is_alive()。如果该进程死亡,则将该工作添加回待办事项队列。

在下面的代码,我模拟工作进程死亡的25%的时间......

from multiprocessing import Process, Queue 
from Queue import Empty 
from random import choice as rndchoice 
import time 

def worker(id, todo_q, start_q, finish_q): 
    """multiprocessing worker""" 
    msg = None 
    while (msg!='DONE'): 
     try: 
      msg = todo_q.get_nowait() # Poll non-blocking on todo_q 
      if (msg!='DONE'): 
       start_q.put((id, msg)) # Let the controller know work started 
       time.sleep(0.05) 
       if (rndchoice(range(3))==1): 
        # Die a fraction of the time before finishing 
        print "DEATH to worker %s who had task=%s" % (id, msg) 
        break 
       finish_q.put((id, msg)) # Acknowledge work finished 
     except Empty: 
      pass 
    return 

if __name__ == '__main__': 
    NUM_WORKERS = 5 
    WORK_ID = set(['A','B','C','D','E']) # Work to be done, you will need to 
            # name work items so they are unique 
    WORK_DONE = set([])    # Work that has been done 
    ASSIGNMENTS = dict()   # Who was assigned a task 
    workers = dict() 
    todo_q = Queue() 
    start_q = Queue() 
    finish_q = Queue() 

    print "Starting %s tasks" % len(WORK_ID) 
    # Add work 
    for work in WORK_ID: 
     todo_q.put(work) 

    # spawn workers 
    for ii in xrange(NUM_WORKERS): 
     p = Process(target=worker, args=(ii, todo_q, start_q, finish_q)) 
     workers[ii] = p 
     p.start() 

    finished = False 
    while True: 
     try: 
      start_ack = start_q.get_nowait() # Poll for work started 
      ## Check for race condition between start_ack and finished_ack 
      if not ASSIGNMENTS.get(start_ack[0], False): 
       ASSIGNMENTS[start_ack[0]] = start_ack # Track the assignment 
       print "ASSIGNED worker=%s task=%s" % (start_ack[0], 
        start_ack[1]) 
       WORK_ID.remove(start_ack[1])  # Account for started tasks 
      else: 
       # Race condition. Never overwrite existing assignments 
       # Wait until the ASSIGNMENT is cleared 
       start_q.put(start_ack) 
     except Empty: 
      pass 

     try: 
      finished_ack = finish_q.get_nowait() # Poll for work finished 
      # Check for race condition between start_ack and finished_ack 
      if (ASSIGNMENTS[finished_ack[0]][1]==finished_ack[1]): 
       # Clean up after the finished task 
       print "REMOVED worker=%s task=%s" % (finished_ack[0], 
        finished_ack[1]) 
       del ASSIGNMENTS[finished_ack[0]] 
       WORK_DONE.add(finished_ack[1]) 
      else: 
       # Race condition. Never overwrite existing assignments 
       # It was received out of order... wait for the 'start_ack' 
       finish_q.put(finished_ack) 
      finished_ack = None 
     except Empty: 
      pass 

     # Look for any dead workers, and put their work back on the todo_q 
     if not finished: 
      for id, p in workers.items(): 
       status = p.is_alive() 
       if not status: 
        print " WORKER %s FAILED!" % id 
        # Add to the work again... 
        todo_q.put(ASSIGNMENTS[id][1]) 
        WORK_ID.add(ASSIGNMENTS[id][1]) 
        del ASSIGNMENTS[id]  # Worker is dead now 
        del workers[id] 
        ii += 1 
        print "Spawning worker number", ii 
        # Respawn a worker to replace the one that died 
        p = Process(target=worker, args=(ii, todo_q, start_q, 
         finish_q)) 
        workers[ii] = p 
        p.start() 
     else: 
      for id, p in workers.items(): 
       p.join() 
       del workers[id] 
      break 

     if (WORK_ID==set([])) and (ASSIGNMENTS.keys()==list()): 
      finished = True 
      [todo_q.put('DONE') for x in xrange(NUM_WORKERS)] 
     else: 
      pass 
    print "We finished %s tasks" % len(WORK_DONE) 

在我的笔记本电脑上运行这个...

[email protected]:~$ python queueack.py 
Starting 5 tasks 
ASSIGNED worker=2 task=C 
ASSIGNED worker=0 task=A 
ASSIGNED worker=4 task=B 
ASSIGNED worker=3 task=E 
ASSIGNED worker=1 task=D 
DEATH to worker 4 who had task=B 
DEATH to worker 3 who had task=E 
    WORKER 3 FAILED! 
Spawning worker number 5 
    WORKER 4 FAILED! 
Spawning worker number 6 
REMOVED worker=2 task=C 
REMOVED worker=0 task=A 
REMOVED worker=1 task=D 
ASSIGNED worker=0 task=B 
ASSIGNED worker=2 task=E 
REMOVED worker=2 task=E 
DEATH to worker 0 who had task=B 
    WORKER 0 FAILED! 
Spawning worker number 7 
ASSIGNED worker=5 task=B 
REMOVED worker=5 task=B 
We finished 5 tasks 
[email protected]:~$ 

我用了这个测试10000个工作项目,死亡率为25%。