0

我正在使用Spring 4.3.8.RELEASE和Java 7.我想创建一个线程工厂来帮助管理我的应用程序中的某些工作人员。我宣布我的线程工厂是这样的如何等待我的线程工厂完成其所有任务?

<bean id="myprojectThreadFactory" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory"> 
    <constructor-arg value="prefix-"/> 
</bean> 
<bean id="myprojectTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> 
    <property name="threadFactory" ref="myprojectThreadFactory"/> 
    <property name="corePoolSize" value="${myproject.core.thread.pool.size}" /> 
    <property name="maxPoolSize" value="${myproject.max.thread.pool.size}" /> 
</bean> 

但是,我在加入线程时遇到了问题。也就是说,我要等到所有的工作具有一定的任务,所以我有

m_importEventsWorker.work(); 
    m_threadExecutor.shutdown(); 
    System.out.println("done."); 

在我的线程池,像这样

public void work(final MyWorkUnit pmyprojectOrg) 
{ 
    final List<MyWorkUnit> allOrgs = new ArrayList<MyWorkUnit>(); 
    if (pmyprojectOrg != null) 
    { 
     processData(pmyprojectOrg.getmyprojectOrgId()); 
    } else { 
     allOrgs.addAll(m_myprojectSvc.findAllWithNonEmptyTokens()); 
     // Cue up threads to execute 
     for (final MyWorkUnit myprojectOrg : allOrgs) 
     { 
      m_threadExecutor.execute(new Thread(new Runnable(){ 
       @Override 
       public void run() 
       { 
        System.out.println("started."); 
        processData(myprojectOrg.getmyprojectOrgId()); 
       } 
      })); 
     } // for 

然而,什么会打印出执行继续之前完成是

done. 
started. 
started. 

很清楚,我不是在等待。什么是正确的方式来等待我的线程完成工作?

+0

你想对线程做什么?他们正在处理相同的数据集还是分开的数据?你能描述一下流程吗? – jeorfevre

+0

线程在不同的数据集上有用。我想知道什么时候所有的线程都完成了他们的任务,但显然“关机”方法不是要走的路。 – Dave

+0

您可以调用'm_threadExecutir.submit(...)',并且该方法将返回一个'FutureTask'实例,您应该调用'FutureTask'的'get()'方法来等待任务完成。 – dabaicai

回答

0

由于我使用Spring的ThreadPoolTask​​Executor类,我发现低于适合我的需求......

protected void waitForThreadPool(final ThreadPoolTaskExecutor threadPoolExecutor) 
{ 
    threadPoolExecutor.setWaitForTasksToCompleteOnShutdown(true); 
    threadPoolExecutor.shutdown();  
    try { 
     threadPoolExecutor.getThreadPoolExecutor().awaitTermination(30, TimeUnit.SECONDS); 
    } catch (IllegalStateException e) { 
     e.printStackTrace(); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
} // waitForThreadPool 
0

A CountDownLatch用给定的计数初始化。该计数通过调用countDown()方法递减。等待此计数达到零的线程可以调用await()方法之一。调用await()会阻塞该线程,直到计数达到零。

您可以使用CountDownLatch主线程等待完成所有的task.You可以宣布CountDownLatch与大小在主线程中调用await()方法任务CountDownLatch latch = new CountDownLatch(3);的数量等,每个任务完成呼叫countDown()

public void work(final MyWorkUnit pmyprojectOrg) 
{ 
    final List<MyWorkUnit> allOrgs = new ArrayList<MyWorkUnit>(); 
    if (pmyprojectOrg != null) 
    { 
     processData(pmyprojectOrg.getmyprojectOrgId()); 
    } else { 
     allOrgs.addAll(m_myprojectSvc.findAllWithNonEmptyTokens()); 

     CountDownLatch latch = new CountDownLatch(allOrgs.size()); 

     // Cue up threads to execute 
     for (final MyWorkUnit myprojectOrg : allOrgs) 
     { 
      m_threadExecutor.execute(new Thread(new Runnable(){ 
       @Override 
       public void run() 
       { 
        System.out.println("started."); 
        processData(myprojectOrg.getmyprojectOrgId()); 
        latch.countDown(); 
       } 
      })); 
     } 
     //After for loop 
     latch.await();  

实施例:

CountDownLatch latch = new CountDownLatch(3); 

Waiter  waiter  = new Waiter(latch); 
Decrementer decrementer = new Decrementer(latch); 

new Thread(waiter)  .start(); 
new Thread(decrementer).start(); 

public class Waiter implements Runnable{ 

    CountDownLatch latch = null; 

    public Waiter(CountDownLatch latch) { 
     this.latch = latch; 
    } 

    public void run() { 
     try { 
      latch.await(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 

     System.out.println("Waiter Released"); 
    } 
} 

公共类递减器实现Runnable {

CountDownLatch latch = null; 

public Decrementer(CountDownLatch latch) { 
    this.latch = latch; 
} 



public void run() { 

     try { 
      Thread.sleep(1000); 
      this.latch.countDown(); 

      Thread.sleep(1000); 
      this.latch.countDown(); 

      Thread.sleep(1000); 
      this.latch.countDown(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 
} 
+0

谢谢,但我不清楚我会如何将它应用于我题。你是说当我创建一个Runnable实例时,我应该创建你的“服务员”类吗? – Dave

+0

创建一个倒数闩锁实例并传递给每个可运行的实例并倒计时。调用工作调用后等待方法 –

+0

“m_threadExecutor.shutdown();”实际上是从JUnit测试中调用的,在测试中的b/c我想在thrads完成后检查一些条件。我想避免将代码添加到我的核心项目中以满足JUnit测试,但如果没有其他方法来执行此操作,我会考虑它。 – Dave

0

可以使用的ExecutorService创建一个固定的线程池,并检查池大小是否为空没有:

ExecutorService executor = Executors.newFixedThreadPool(50); 

如果使用此执行运行任务,并通过使用@Scheduled固定利率或FIXEDDELAY定期检查线程池的大小,你可以看到,如果他们完成与否。

ThreadPoolExecutor poolInfo = (ThreadPoolExecutor) executor; 
Integer activeTaskCount = poolInfo.getActiveCount(); 

if(activeTaskCount = 0) { 
    //If it is 0, it means threads are waiting for tasks, they have no assigned tasks. 
    //Do whatever you want here! 
} 
+0

这将告诉我在一个固定的时间点,如果我被证明,但我想实际上暂停程序执行,直到一切都完成。 – Dave