2011-07-01 26 views
2

我有一个包含工作项的队列,我希望多个线程在这些项目上并行工作。 处理工作项目时,可能会产生新的工作项目。我遇到的问题是,我无法找到解决办法,如何确定我是否完成。工人看起来是这样的:如何处理可能会创建新工作项的多个工作线程

public class Worker implements Runnable { 
    public void run() { 
    while (true) { 
     WorkItem item = queue.nextItem(); 
     if (item != null) { 
     processItem(item); 
     } 
     else { 
     // the queue is empty, but there may still be other workers 
     // processing items which may result in new work items 
     // how to determine if the work is completely done? 
     } 
    } 
    } 
} 

这似乎是一个非常简单的问题,但实际上我很茫然。什么是最好的实施方式?

感谢

澄清: 工作线程必须终止,一旦他们没有被处理的项目,但只要其中至少有一个是仍在工作,他们必须等待,因为这可能会导致新的工作项目。

+1

通常您会等待某种同步对象,并在每次将新项目添加到队列时收到通知。 – sje397

回答

0

我建议等待/通知电话。在其他情况下,工作线程会等待对象,直到队列通知需要做更多工作。当工作人员创建新项目时,它会将其添加到队列中,并且队列调用会通知工作人员正在等待的对象。其中一人将醒来消耗新的物品。

类Object的wait,notify和notifyAll方法支持从一个线程到另一个线程的有效控制转移。线程可以使用等待来暂停自己,而不是简单地“旋转”(反复地锁定和解锁对象以查看某个内部状态是否发生了变化),直到另一个线程使用通知唤醒为止。这对于线程具有生产者 - 消费者关系(主动协作以实现共同目标)而不是互斥关系(试图避免共享公共资源时发生冲突)的情况尤为适用。

Source: Threads and Locks

+0

嗨。感谢您的回答。但是当所有的工人都在等待时,就不会有更多的工作,所以在这种情况下,我需要退出循环。而正是这是我的问题,即我需要线程终止,一旦他们都没有工作。 – Stefan

+0

有趣。队列需要知道所有工人的状态。我会用while(!queue.workComplete())替换while(true)行。每当请求新项目并且当前没有新工作时,请让队列检查工作人员的状态。如果他们都报告他们已经完成了他们的任务,请为workComplete()返回true,然后在工人等待的对象上调用NotifyAll。他们都醒了,发现没有更多的工作要来了,终止。可能需要一些同步技巧来解决,但我相当确定它可以工作。 – Coeffect

+0

谢谢,这听起来可行,我会考虑一下。 – Stefan

0

我想看看什么水平高于等待/通知。要正确并避免死锁是非常困难的。你看过java.util.concurrent.CompletionService<V>?您可以有一个简单的经理线程来轮询服务和结果,这些结果可能包含或不包含新的工作项目。

0

使用BlockingQueue包含的项目与同步组跟踪的所有元素来处理沿着当前正在处理:

BlockingQueue<WorkItem> bQueue; 
Set<WorkItem> beingProcessed = new Collections.synchronizedSet(new HashMap<WorkItem>()); 
bQueue.put(workItem); 
... 

// the following runs over many threads in parallel 
while (!(bQueue.isEmpty() && beingProcessed.isEmpty())) { 
    WorkItem currentItem = bQueue.poll(50L, TimeUnit.MILLISECONDS); // null for empty queue 
    if (currentItem != null) { 
    beingProcessed.add(currentItem); 
    processItem(currentItem); // possibly bQueue.add(newItem) is called from processItem 
    beingProcessed.remove(currentItem); 
    } 
} 

编辑:作为@Hovercraft完整的鳗鱼建议,一个ExecutorService可能是你应该用什么。随时随地添加新任务。您可以半忙等待以executorService.awaitTermination(time, timeUnits)定期终止所有任务,并在此之后终止所有线程。

+0

谢谢,这非常有帮助。不幸的是,由于代表低,我无法赞成。 – Stefan

0

下面是排队的开始,以解决您的问题。基本上,您需要在过程工作中跟踪新工作

public class WorkQueue<T> { 
    private final List<T> _newWork = new LinkedList<T>(); 
    private int _inProcessWork; 

    public synchronized void addWork(T work) { 
     _newWork.add(work); 
     notifyAll(); 
    } 

    public synchronized T startWork() throws InterruptedException { 
     while(_newWork.isEmpty() && (_inProcessWork > 0)) { 
      wait(); 
      if(!_newWork.isEmpty()) { 
       _inProcessWork++; 
       return _newWork.remove(0); 
      } 
     } 
     // everything is done 
     return null; 
    } 

    public synchronized void finishWork() { 
     _inProcessWork--; 
     if((_inProcessWork == 0) && _newWork.isEmpty()) { 
      notifyAll(); 
     } 
    } 
} 

你的工人将大致如下:

public class Worker { 
    private final WorkQueue<T> _queue; 

    public void run() { 
    T work = null; 
    while((work = _queue.startWork()) != null) { 
     try { 
     // do work here... 
     } finally { 
     _queue.finishWork(); 
     } 
    } 
    } 
} 

的一个技巧是,你需要添加的第一个工作项_before启动任何人员(否则他们将全部立即退出)。

+0

非常感谢您的努力,这看起来相当不错。不幸的是我还不能满足。 – Stefan