2011-07-20 54 views
1

我将正在运行的生产者/使用者示例从线程/可运行转换为Executor/Callable/BlockingQueues并使用Poison Pill终止模式。即使所有线程完成后,应用程序仍会挂起几分钟

如果您运行下面的程序,即使每个线程都已完成,它仍会挂起几分钟。 jstack显示在与应用程序看似不相关的队列上阻塞了大量线程。

"pool-1-thread-10" prio=5 tid=10b08d000 nid=0x10d91c000 waiting on condition [10d91b000] 
    java.lang.Thread.State: TIMED_WAITING (parking) 
    at sun.misc.Unsafe.park(Native Method) 
    - parking to wait for <7f3113510> (a java.util.concurrent.SynchronousQueue$TransferStack) 
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198) 
    at java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:424) 
    at java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:323) 
    at java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:874) 
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:945) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907) 
    at java.lang.Thread.run(Thread.java:680) 

我找不出为什么应用程序挂起。任何帮助表示赞赏。 谢谢

import java.util.ArrayList; 
import java.util.Collection; 
import java.util.List; 
import java.util.Random; 
import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.Future; 
import java.util.concurrent.LinkedBlockingQueue; 

public class ProducersConsumers { 
    private LinkedBlockingQueue<Item> queue = new LinkedBlockingQueue<Item>(); 
    private static final ExecutorService executorPool = Executors.newCachedThreadPool(); 
    private Random randGenerator = new Random(System.currentTimeMillis()); 

    private class Item { 
     private boolean done = false; 
     private String message; 

     private Item(boolean done) { 
      this.done = done; 
     } 

     private Item(String message) { 
      this.message = message; 
     } 

     public boolean isDone() { 
      return done; 
     } 

     public String getMessage() { 
      return message; 
     } 
    } 

    private class Producer implements Callable<Long> { 
     private final int id; 
     private Integer numOfMessages; 

     private Producer(int id, int numOfMessages) { 
      this.id = id; 
      this.numOfMessages = numOfMessages; 
     } 

     @Override 
     public Long call() throws Exception { 
      long totalTime = 0; 
      while (numOfMessages > 0) { 
       String message; 
       synchronized (numOfMessages) { 
        long starttime = System.nanoTime(); 
        int msgLength = randGenerator.nextInt(20000); 
        StringBuilder sb = new StringBuilder(msgLength); 
        for (int a = 0; a < msgLength; a++) { 
         sb.append((char) ('a' + randGenerator.nextInt(26))); 
        } 
        message = sb.toString(); 
        long endtime = System.nanoTime(); 
        totalTime += endtime - starttime; 
       } 
       numOfMessages--; 
       queue.put(new Item(message)); 
      } 
      System.out.println("-------------Producer " + id + " is done."); 
      queue.put(new Item(true)); 
      return totalTime; 
     } 
    } 

    private class Consumer implements Callable<Long> { 
     private String monitor = "monitor"; 
     private final int id; 

     private Consumer(int id) { 
      this.id = id; 
     } 

     @Override 
     public Long call() throws Exception { 
      long totalTime = 0; 
      while (true) { 
       Item item = queue.take(); 
       if (item.isDone()) { 
        break; 
       } 
       synchronized (monitor) { 
        long starttime = System.nanoTime(); 
        StringBuilder sb = new StringBuilder(item.getMessage()); 
        sb = sb.reverse(); 
        String message = sb.toString(); 
        long endtime = System.nanoTime(); 
        totalTime += endtime - starttime; 
       } 
      } 
      System.out.println("+++++++++++++Consumer " + id + " is done."); 
      return totalTime; 
     } 
    } 

    public void begin(int threadCount) throws InterruptedException, ExecutionException { 
     Collection<Producer> producers = new ArrayList<Producer>(); 
     for (int i = 0; i < threadCount; i++) { 
      producers.add(new Producer(i, randGenerator.nextInt(5))); 
     } 
     Collection<Consumer> consumers = new ArrayList<Consumer>(); 
     for (int i = 0; i < threadCount; i++) { 
      consumers.add(new Consumer(i)); 
     } 
     try { 
      long starttime = System.nanoTime(); 
      List<Future<Long>> producerFutureList = executorPool.invokeAll(producers); 
      List<Future<Long>> consumerFutureList = executorPool.invokeAll(consumers); 
      long producerTotalTime = 0; 
      long consumerTotalTime = 0; 

      for (Future<Long> future : producerFutureList) { 
       producerTotalTime += future.get(); 
      } 
      for (Future<Long> future : consumerFutureList) { 
       consumerTotalTime += future.get(); 
      } 
      long mainThreadTotalTime = System.nanoTime() - starttime; 

      System.out.println("producerTotalTime " + producerTotalTime); 
      System.out.println("consumerTotalTime " + consumerTotalTime); 
      System.out.println("mainThreadTotalTime " + mainThreadTotalTime); 
      System.out.println("Difference   " + (producerTotalTime + consumerTotalTime - mainThreadTotalTime)); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. 
      throw e; 
     } catch (ExecutionException e) { 
      e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. 
      throw e; 
     } 

    } 

    public static void main(String[] args) throws ExecutionException, InterruptedException { 
     ProducersConsumers prodcon = new ProducersConsumers(); 
     prodcon.begin(20); 
    } 
} 

回答

4

当你完成它时你应该关闭ExecutorService。在程序结束时调用executorPool.shutdown()。

+0

ExecutorService的文档没有说明您完成后需要关闭,但这些示例都是这样。 – ptomli

+0

就是这样!调用shutdown()可以解决问题。非常感谢你。 – jabawaba

+0

@ptomli我知道你发表评论已经很长时间了。起初,我想到了同样的事情。但是最近,我在[docs](http://docs.oracle.com/javase/6/docs/api/java/util/concurrent/ExecutorService.html)中找到了一些支持“shutdown”的东西。他们声明:*一个未使用的ExecutorService应该关闭以允许其资源的回收。*因此,当你完成你的'ExecutorService'时,你应该总是调用'shutdown'。 –

0

您似乎正在使用共享资源,特别是在同步块之外使用numOfMessages

while (numOfMessages > 0) { 
    // blah 
    synchronized (numOfMessages) { 
     // blah 
    } 
} 

我不认为这是你的问题的原因,但它肯定是非线程安全的。这是一个典型的检查后行为场景。为什么这是Not Good(TM),请参阅Java Concurrency in PracticeEffective Java

+0

numOfMessages在此处不作为同步监视器。正如你注意到的,numOfMessages是对象级别的,因此没有人会在一个线程之外访问它。然而,对于同步块的原因是向JVM表明它不应当在块内执行时中断此线程,以便记录准确的总执行时间。如果线程由于加载而换出,则记录的处理时间将包括等待状态阶段,因此不准确。感谢您提出 – jabawaba

+0

或者您确定这是真的吗?我写了一个示例程序,似乎即使在同步块内也有上下文切换。 – duduamar

+0

嗯。现在还不确定。我以为我在很多年前了解到这一点,但在您提到它之后,我无法验证它。有没有阻止调度程序在关键块中交换线程? – jabawaba

相关问题