2010-10-03 19 views
1

我想延长在这里描述的Queue.PriorityQueue:http://docs.python.org/library/queue.html#Queue.PriorityQueue扩展Python Queue.PriorityQueue(工人优先,工作的封装类型)

队列将举行工作包具有优先级。工人将获得工作包并处理它们。我想做以下补充:

  1. 工人也有一个优先。当多名员工闲置时,具有最高优先级的员工应该处理一个即将到来的工作包。

  2. 并非每个工人都可以处理每个工作包,因此需要一种机制来检查工作包类型和工作者能力是否匹配。

我在寻找提示,如何最好的实现(从头开始,扩展PrioriyQueue或Queue,...)。

编辑

这是我第一次(未经测试)试试。基本思想是所有等待的线程都会被通知。然后他们都试图通过_choose_worker(self, worker)获得工作项目。 (产地它社区维基)

编辑

作品对于现在一些简单的测试......

编辑 添加自定义BaseManager和工人名单的本地副本在_choose_worker函数中。

编辑 bug修复

import Queue 
from Queue import Empty, Full 
from time import time as _time 
import heapq 

class AdvancedQueue(Queue.PriorityQueue): 

    # Initialize the queue representation 
    def _init(self, _maxsize): 
     self.queue = [] 
     self.worker = [] 

    def put(self, item, block=True, timeout=None): 
     ''' 
     Put an item into the queue. 

     If optional args 'block' is true and 'timeout' is None (the default), 
     block if necessary until a free slot is available. If 'timeout' is 
     a positive number, it blocks at most 'timeout' seconds and raises 
     the Full exception if no free slot was available within that time. 
     Otherwise ('block' is false), put an item on the queue if a free slot 
     is immediately available, else raise the Full exception ('timeout' 
     is ignored in that case). 
     ''' 
     self.not_full.acquire() 
     try: 
      if self.maxsize > 0: 
       if not block: 
        if self._qsize() == self.maxsize: 
         raise Full 
       elif timeout is None: 
        while self._qsize() == self.maxsize: 
         self.not_full.wait() 
       elif timeout < 0: 
        raise ValueError("'timeout' must be a positive number") 
       else: 
        endtime = _time() + timeout 
        while self._qsize() == self.maxsize: 
         remaining = endtime - _time() 
         if remaining <= 0.0: 
          raise Full 
         self.not_full.wait(remaining) 
      self._put(item) 
      self.unfinished_tasks += 1 
      self.not_empty.notifyAll() # only change 
     finally: 
      self.not_full.release() 

    def get(self, worker, block=True, timeout=None): 
     self.not_empty.acquire() 
     try: 
      self._put_worker(worker) 

      if not block: 
       if not self._qsize(): 
        raise Empty 
       else: 
        return self._choose_worker(worker) 
      elif timeout is None: 
       while True: 
        while not self._qsize(): 
         self.not_empty.wait() 
        try: 
         return self._choose_worker(worker) 
        except Empty: 
         self.not_empty.wait() 

      elif timeout < 0: 
       raise ValueError("'timeout' must be a positive number") 
      else: 
       endtime = _time() + timeout 
       def wait(endtime): 
        remaining = endtime - _time() 
        if remaining <= 0.0: 
         raise Empty 
        self.not_empty.wait(remaining) 

       while True: 
        while not self._qsize(): 
         wait(endtime) 

        try: 
         return self._choose_worker(worker) 
        except Empty: 
         wait(endtime) 
     finally: 
      self._remove_worker(worker) 
      self.not_empty.release() 

    # Put a new worker in the worker queue 
    def _put_worker(self, worker, heappush=heapq.heappush): 
     heappush(self.worker, worker) 

    # Remove a worker from the worker queue 
    def _remove_worker(self, worker): 
     self.worker.remove(worker) 

    # Choose a matching worker with highest priority 
    def _choose_worker(self, worker): 
     worker_copy = self.worker[:] # we need a copy so we can remove assigned worker 
     for item in self.queue: 
      for enqueued_worker in worker_copy: 
       if item[1].type in enqueued_worker[1].capabilities: 
        if enqueued_worker == worker: 
         self.queue.remove(item) 
         self.not_full.notify() 
         return item 
        else: 
         worker_copy.remove(enqueued_worker) 
         # item will be taken by enqueued_worker (which has higher priority), 
         # so enqueued_worker is busy and can be removed 
         continue 
     raise Empty 
+1

+1有趣的问题。我有一个想法,但我想首先看到其他答案。我现在只想给你一个小提示:注意有免费工作和两名工人可以免费工作的情况,但是最高优先级的工作人员无法处理队列中的工作。小心你不要陷入僵局。类似的情况是,你有两个任务,一个工作人员和工作人员无法处理最高优先级的工作,再次注意死锁。你应该单元测试这些情况(还有更多的测试用于其他更常见的场景 - 空队列等)。 – 2010-10-03 09:06:14

+0

非常适合我开始使用python单元测试:) – tauran 2010-10-03 09:26:06

回答

0

唯一的答案是有用的,但不够详细,所以我现在会接受我自己的答案。查看问题中的代码。

1

我认为你是说明您有两个“优先级队列”的情况 - 一个用于工作,一个用于工作人员。幼稚的做法是把最重要的工作和最重要的工作人员,并尝试将它们配对。但当工人无法执行工作时,这当然会失败。

为了解决这个问题,我建议首先考虑优先考虑的工作,然后按照优先级递减的顺序迭代所有工作人员,直到找到可以处理该工作的工作人员。如果没有工人可以处理这项工作,那么就采取第二优先级的工作,等等。所以有效地你有嵌套循环,如下所示:

def getNextWorkerAndJobPair(): 
    for job in sorted(jobs, key=priority, reverse=True): 
     for worker in sorted(workers, key=priority, reverse=True): 
      if worker.can_process(job): 
       return (worker, job) 

上面的示例虽然多次不必要地排序数据。为了避免这种情况,最好以已排序的顺序存储数据。至于要使用什么数据结构,我不确定最好是什么。理想情况下,您需要O(log n)插入和删除,并且能够在O(n)时间以排序顺序遍历集合。我认为PriorityQueue满足这些要求中的第一个,但不是第二个。我想从blist包的排序列表可以工作,但我没有尝试过,我的网页没有针对此类提供的性能保证。

我建议首先遍历作业,然后遍历内部循环中的工作者的方式并不是您可以采用的唯一方法。您也可以颠倒循环的顺序,以便首先选择最高优先级的工作人员,然后尝试为其找到工作。或者你可以找到对某个函数f有最大值f(priority_job,priority_worker)的有效(job,worker)对(例如,只需添加优先级)。

+0

对于我的编辑,我遵循了你的方法。首先我会试着让它工作,然后考虑调整它。我也想从'PriorityQueue'中重用尽可能多的代码。 – tauran 2010-10-04 11:49:17