我想延长在这里描述的Queue.PriorityQueue:http://docs.python.org/library/queue.html#Queue.PriorityQueue扩展Python Queue.PriorityQueue(工人优先,工作的封装类型)
队列将举行工作包具有优先级。工人将获得工作包并处理它们。我想做以下补充:
工人也有一个优先。当多名员工闲置时,具有最高优先级的员工应该处理一个即将到来的工作包。
并非每个工人都可以处理每个工作包,因此需要一种机制来检查工作包类型和工作者能力是否匹配。
我在寻找提示,如何最好的实现(从头开始,扩展PrioriyQueue或Queue,...)。
编辑
这是我第一次(未经测试)试试。基本思想是所有等待的线程都会被通知。然后他们都试图通过_choose_worker(self, worker)
获得工作项目。 (产地它社区维基)
编辑
作品对于现在一些简单的测试......
编辑 添加自定义BaseManager
和工人名单的本地副本在_choose_worker
函数中。
编辑 bug修复
import Queue
from Queue import Empty, Full
from time import time as _time
import heapq
class AdvancedQueue(Queue.PriorityQueue):
# Initialize the queue representation
def _init(self, _maxsize):
self.queue = []
self.worker = []
def put(self, item, block=True, timeout=None):
'''
Put an item into the queue.
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until a free slot is available. If 'timeout' is
a positive number, it blocks at most 'timeout' seconds and raises
the Full exception if no free slot was available within that time.
Otherwise ('block' is false), put an item on the queue if a free slot
is immediately available, else raise the Full exception ('timeout'
is ignored in that case).
'''
self.not_full.acquire()
try:
if self.maxsize > 0:
if not block:
if self._qsize() == self.maxsize:
raise Full
elif timeout is None:
while self._qsize() == self.maxsize:
self.not_full.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a positive number")
else:
endtime = _time() + timeout
while self._qsize() == self.maxsize:
remaining = endtime - _time()
if remaining <= 0.0:
raise Full
self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notifyAll() # only change
finally:
self.not_full.release()
def get(self, worker, block=True, timeout=None):
self.not_empty.acquire()
try:
self._put_worker(worker)
if not block:
if not self._qsize():
raise Empty
else:
return self._choose_worker(worker)
elif timeout is None:
while True:
while not self._qsize():
self.not_empty.wait()
try:
return self._choose_worker(worker)
except Empty:
self.not_empty.wait()
elif timeout < 0:
raise ValueError("'timeout' must be a positive number")
else:
endtime = _time() + timeout
def wait(endtime):
remaining = endtime - _time()
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
while True:
while not self._qsize():
wait(endtime)
try:
return self._choose_worker(worker)
except Empty:
wait(endtime)
finally:
self._remove_worker(worker)
self.not_empty.release()
# Put a new worker in the worker queue
def _put_worker(self, worker, heappush=heapq.heappush):
heappush(self.worker, worker)
# Remove a worker from the worker queue
def _remove_worker(self, worker):
self.worker.remove(worker)
# Choose a matching worker with highest priority
def _choose_worker(self, worker):
worker_copy = self.worker[:] # we need a copy so we can remove assigned worker
for item in self.queue:
for enqueued_worker in worker_copy:
if item[1].type in enqueued_worker[1].capabilities:
if enqueued_worker == worker:
self.queue.remove(item)
self.not_full.notify()
return item
else:
worker_copy.remove(enqueued_worker)
# item will be taken by enqueued_worker (which has higher priority),
# so enqueued_worker is busy and can be removed
continue
raise Empty
+1有趣的问题。我有一个想法,但我想首先看到其他答案。我现在只想给你一个小提示:注意有免费工作和两名工人可以免费工作的情况,但是最高优先级的工作人员无法处理队列中的工作。小心你不要陷入僵局。类似的情况是,你有两个任务,一个工作人员和工作人员无法处理最高优先级的工作,再次注意死锁。你应该单元测试这些情况(还有更多的测试用于其他更常见的场景 - 空队列等)。 – 2010-10-03 09:06:14
非常适合我开始使用python单元测试:) – tauran 2010-10-03 09:26:06