2015-11-22 68 views
3

我有一个Java多线程文件搜寻器,我正在做一个问题。我的问题是,我有一个workQueue,它是一个linkedBlockingQueue,它包含我想要用我的线程爬过的文件的名称,每个线程将从workQueue中take(),并在扫描整个文件的同时可能会将put()另一个文件名放入workQueue(这是一个依赖检查程序)。所以我从来没有真正确定什么时候工作全部完成,并且所有线程最终都会进入等待状态,当他们尝试从(最终)空workQueue中尝试take()时。初学者Java多线程问题

所以我想我的问题是,是否有一种有效的方法来终止所有的线程(当所有的线程都进入等待状态时)?目前我只在主线程上使用sleep(),然后在所有的工作线程中使用interrupt()

对不起,如果这个问题听起来很混乱。

回答

0

您可以使用下面的方法。如果需要,添加观察者模式。 或者简单地说 - 不是用死信息包发送信号,而是收集等待线程列表,然后中断()它们。

public class AccessCountingLinkedPrioQueue<T> { 

    private final LinkedBlockingQueue<T> mWrappingQueue     = new LinkedBlockingQueue<>(); 
    private final Object     mSyncLockObj     = new Object(); 

    private final int      mMaxBlockingThreads; 
    private final T       mDeathSignallingObject; 

    private volatile int     mNumberOfThreadsInAccessLoop = 0; 

    public AccessCountingLinkedPrioQueue(final int pMaxBlockingThreads, final T pDeathSignallingObject) { 
     mMaxBlockingThreads = pMaxBlockingThreads; 
     mDeathSignallingObject = pDeathSignallingObject; 
    } 

    public T take() throws InterruptedException { 
     final T retVal; 
     synchronized (mSyncLockObj) { 
      ++mNumberOfThreadsInAccessLoop; 
     } 
     synchronized (mWrappingQueue) { 
      if (mNumberOfThreadsInAccessLoop >= mMaxBlockingThreads && mWrappingQueue.isEmpty()) signalDeath(); 
      retVal = mWrappingQueue.take(); 
     } 
     synchronized (mSyncLockObj) { 
      --mNumberOfThreadsInAccessLoop; 
     } 
     return retVal; 
    } 

    private void signalDeath() { 
     for (int i = 0; i < mMaxBlockingThreads; i++) { 
      mWrappingQueue.add(mDeathSignallingObject); 
     } 
    } 

    public int getNumberOfThreadsInAccessLoop() { 
     return mNumberOfThreadsInAccessLoop; 
    } 
} 

class WorkPacket { 
    // ... your content here 
} 

class MultiThreadingBoss { 
    static public final WorkPacket DEATH_FROM_ABOVE = new WorkPacket(); 

    public MultiThreadingBoss() { 
     final int THREADS = 7; 
     final AccessCountingLinkedPrioQueue<WorkPacket> prioQ = new AccessCountingLinkedPrioQueue<>(THREADS, DEATH_FROM_ABOVE); 
     for (int i = 0; i < THREADS; i++) { 
      final ThreadedWorker w = new ThreadedWorker(prioQ); 
      new Thread(w).start(); 
     } 
    } 
} 

class ThreadedWorker implements Runnable { 
    private final AccessCountingLinkedPrioQueue<WorkPacket> mPrioQ; 

    public ThreadedWorker(final AccessCountingLinkedPrioQueue<WorkPacket> pPrioQ) { 
     mPrioQ = pPrioQ; 
    } 

    @Override public void run() { 
     while (true) { 
      try { 
       final WorkPacket p = mPrioQ.take(); 
       if (p == MultiThreadingBoss.DEATH_FROM_ABOVE) break; // or return 

       // ... do your normal work here 

      } catch (final InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    } 
} 
+0

真诚的thanx为downvotes人。但是在上帝的名下,请留下评论为什么这是不好的,所以我们都学习。 – JayC667

+1

不确定downvotes,但我会用ExecutorService管理线程。 –

1

我以前有这个问题,我发现的唯一办法就是到一个特殊的标记对象发送到BlockingQueue。当队列.take()的对象,如果这是标记,那么Thread自己结束。

我试过其他解决方案,比如唤醒线程并检测异常,但没有成功。

1

有一种叫做Poison Pill的模式,对此很有帮助。基本上,当生产者完成后,在队列中插入一个特殊值,告诉消费者停止。您可以为每个消费者插入一个药丸,或者,一旦消费者获得毒药丸,将其返回到下一个消费者的队列。因为它听起来就像你刚刚入队的字符串,像

public static final String POISON_PILL = "DONE"; 

或者在Java中8,使用Optional来包装你的价值观,那么已经不存在是避孕药。

BlockingQueue<Optional<...>> queue; 

另一种选择是使用ExecutorService(这实际上是由BlockingQueue支持),并利用每个文件作为自己的任务,然后使用executorService.shutdown()时,即可大功告成。与此相关的问题是,它会比你需要的代码更紧密地耦合你的代码,并且使得重用像数据库和HTTP连接之类的资源变得更加困难。

我会避免中断您的工作人员发出信号,因为这可能会导致阻止IO操作失败。