2012-01-04 26 views
0

我有一种情况,我有2个阻塞队列。第一个插入一些我执行的任务。每个任务完成后,它会将任务添加到第二个队列中,并在其中执行。Java BlockingQueue在take()上阻塞,稍微扭曲

所以我的第一个队列很简单:我只是检查,以确保它不是空的,执行,否则我打断():

public void run() { 
    try { 
     if (taskQueue1.isEmpty()) { 
      SomeTask task = taskQueue1.poll(); 
      doTask(task); 
      taskQueue2.add(task); 
     } 
     else { 
      Thread.currentThread().interrupt(); 
     } 
    } 

    catch (InterruptedException ex) { 
     ex.printStackTrace(); 
    } 
} 

第二个我这样做,正如你所知道的,不工作:

public void run() { 
    try { 
     SomeTask2 task2 = taskQueue2.take(); 
     doTask(task2); 
    } 

    catch (InterruptedException ex) { 

    } 
    Thread.currentThread().interrupt(); 

} 

你将如何解决这个问题,使得第二的BlockingQueue不上取(块),但完成,只有当它知道没有添加更多的项目。如果第二个线程可能会看到第一个阻塞队列,并且检查它是否为空,并且第二个队列也是空的,那么它会很好,然后它会中断。

我也可以使用Poison对象,但宁愿选择别的。

注意:这不是确切的代码,只是我在这里写下:

+1

我无法弄清楚你在这里要做什么。请解释。 – 2012-01-04 01:05:27

回答

1

您听起来好像处理第一个队列的线程知道在其队列耗尽后不会再有任务进入。这听起来很可疑,但我会随时听取你的意见并提出解决方案。

定义两个线程均可见的AtomicInteger。初始化为正数

定义第一线程的操作如下:

  • 环路上Queue#poll()
  • 如果Queue#poll()返回null,则在共享整数上调用AtomicInteger#decrementAndGet()
    • 如果AtomicInteger#decrementAndGet()返回零,则通过Thread#interrupt()中断第二个线程。 (这处理了没有物品到达的情况。)
    • 无论哪种情况,退出循环。
  • 否则,处理提取的项目,在共享整数上调用AtomicInteger#incrementAndGet(),将提取的项目添加到第二个线程的队列中,并继续循环。

定义线程的操作如下:

  • 环路阻塞BlockingQueue#take()
  • 如果BlockingQueue#take()引发InterruptedException,发现异常,请致电Thread.currentThread().interrupt(),并退出循环。
  • 否则,处理提取的项目。
  • 在共享整数上调用AtomicInteger#decrementAndGet()
    • 如果AtomicInteger#decrementAndGet()返回零,则退出循环。
    • 否则,继续循环。

确保您了解尝试写实际的代码之前的想法。合约是第二个线程继续等待队列中的更多项目,直到预期任务的计数达到零。此时,生产线程(第一个)不再将任何新项目推送到第二个线程的队列中,因此第二个线程知道停止为其队列服务是安全的。

没有任务到达第一个线程的队列时,就会出现扭曲的情况。由于第二个线程仅在之后递减并测试的计数,所以它处理物品,如果它从来没有机会处理任何物品,则它不会考虑停止。我们使用线程中断来处理这种情况,代价是第一个线程循环终止步骤中的另一个条件分支。幸运的是,该分支只能执行一次。

有许多设计可以在这里工作。我只描述了一个只引入了一个额外的实体—的共享原子整数—,但即使如此,它很费劲。我认为使用毒丸会更清洁,但我确实承认Queue#add()BlockingQueue#put()均不接受null作为有效元素(由于Queue#poll()的返回值合同)。否则将很容易使用null作为毒丸

+0

谢谢。我将尝试首先使用毒药丸 – 2012-01-04 02:59:16

+0

我不知道我可以使用毒药丸,因为有多个消耗队列。 – 2012-01-04 03:35:42

+0

@ DominicBou-Samra - 如果你解释了你的代码试图达到什么目的,也许有人可以帮助你以更明智的方式做到这一点。 – 2012-01-04 04:14:43

2

我想不出什么你实际上是试图在这里做的,但我可以说,interrupt()在你第一次run()方法是无意义的或错误的。

  • 如果您正在运行在自己的Thread对象run()方法,那么线程大概要退出,所以没有点中断它。

  • 如果您正在一个带有线程池的执行程序中运行run()方法,那么您很可能不希望终止该线程或关闭执行程序......此时。如果你想关闭执行程序,那么你应该调用其中一种关闭方法。


例如,这里有一个版本什么呢,你似乎怎么做,而不全部中断的东西,没有线程创建/销毁流失。

public class TaskExecutor { 

    private ExecutorService executor = new ThreadPoolExecutorService(...); 

    public void submitTask1(final SomeTask task) { 
     executor.submit(new Runnable(){ 
      public void run() { 
       doTask(task); 
       submitTask2(task); 
      } 
     }); 
    } 

    public void submitTask2(final SomeTask task) { 
     executor.submit(new Runnable(){ 
      public void run() { 
       doTask2(task); 
      } 
     }); 
    } 

    public void shutdown() { 
     executor.shutdown(); 
    } 
} 

如果您想为任务单独排队,只需创建并使用两个不同的执行程序即可。