2011-10-18 105 views
1

我有以下情况:Java线程等待值

为了运行一个算法,我必须运行多个线程和每个线程都设置一个实例变量x,它死前。问题是这些线程不会立即返回:

public Foo myAlgorithm() 
{ 
    //create n Runnables (n is big) 
    //start these runnables (may take long time do die) 

    //i need the x value of each runnable here, but they havent finished yet! 

    //get average x from all the runnables 

    return new Foo(averageX); 
} 

我应该使用等待通知吗?或者我应该只是嵌入一个while循环并检查终止?

谢谢大家!

回答

4

创建一些共享存储来保存来自每个线程的值x,或者只需存储总和即可。使用CountDownLatch等待线程终止。每个线程完成后将调用CountDownLatch.countDown(),并且您的myAlgorithm方法将使用CountDownLatch.await()方法等待它们。

编辑:下面是我建议的方法的完整示例。它创建了39个工作线程,每个线程都将一个随机数添加到共享总和中。当所有工人都完成后,平均数就会被计算和打印。

import java.util.Random; 
import java.util.concurrent.CountDownLatch; 
import java.util.concurrent.atomic.AtomicInteger; 

class Worker implements Runnable { 

    private final AtomicInteger sum; 
    private final CountDownLatch latch; 

    public Worker(AtomicInteger sum, CountDownLatch latch) { 
     this.sum = sum; 
     this.latch = latch; 
    } 

    @Override 
    public void run() { 
     Random random = new Random(); 

     try { 
      // Sleep a random length of time from 5-10s 
      Thread.sleep(random.nextInt(5000) + 5000); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 

     // Compute x 
     int x = random.nextInt(500); 

     // Add to the shared sum 
     System.out.println("Adding " + x + " to sum"); 
     sum.addAndGet(x); 

     // This runnable is finished, so count down 
     latch.countDown(); 
    } 
} 

class Program { 

    public static void main(String[] args) { 
     // There will be 39 workers 
     final int N = 39; 

     // Holds the sum of all results from all workers 
     AtomicInteger sum = new AtomicInteger(); 
     // Tracks how many workers are still working 
     CountDownLatch latch = new CountDownLatch(N); 

     System.out.println("Starting " + N + " workers"); 

     for (int i = 0; i < N; i++) { 
      // Each worker uses the shared atomic sum and countdown latch. 
      Worker worker = new Worker(sum, latch); 

      // Start the worker 
      new Thread(worker).start(); 
     } 

     try { 
      // Important: waits for all workers to finish. 
      latch.await(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 

     // Compute the average 
     double average = (double) sum.get()/(double) N; 

     System.out.println(" Sum: " + sum.get()); 
     System.out.println("Workers: " + N); 
     System.out.println("Average: " + average); 
    } 

} 

输出应该是这样的:

Starting 39 workers 
Adding 94 to sum 
Adding 86 to sum 
Adding 454 to sum 
... 
... 
... 
Adding 358 to sum 
Adding 134 to sum 
Adding 482 to sum 
    Sum: 10133 
Workers: 39 
Average: 259.8205128205128 

编辑:只是为了好玩,这里是一个使用ExecutorServiceCallableFuture一个例子。

import java.util.ArrayList; 
import java.util.Collection; 
import java.util.List; 
import java.util.Random; 
import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutionException; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Future; 
import java.util.concurrent.ScheduledThreadPoolExecutor; 

class Worker implements Callable<Integer> { 

    @Override 
    public Integer call() throws Exception { 
     Random random = new Random(); 

     // Sleep a random length of time, from 5-10s 
     Thread.sleep(random.nextInt(5000) + 5000); 

     // Compute x 
     int x = random.nextInt(500); 
     System.out.println("Computed " + x); 

     return x; 
    } 

} 

public class Program { 

    public static void main(String[] args) { 
     // Thread pool size 
     final int POOL_SIZE = 10; 

     // There will be 39 workers 
     final int N = 39; 

     System.out.println("Starting " + N + " workers"); 

     // Create the workers 
     Collection<Callable<Integer>> workers = new ArrayList<Callable<Integer>>(N); 

     for (int i = 0; i < N; i++) { 
      workers.add(new Worker()); 
     } 

     // Create the executor service 
     ExecutorService executor = new ScheduledThreadPoolExecutor(POOL_SIZE); 

     // Execute all the workers, wait for the results 
     List<Future<Integer>> results = null; 

     try { 
      // Executes all tasks and waits for them to finish 
      results = executor.invokeAll(workers); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
      return; 
     } 

     // Compute the sum from the results 
     int sum = 0; 

     for (Future<Integer> future : results) { 
      try { 
       sum += future.get(); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); return; 
      } catch (ExecutionException e) { 
       e.printStackTrace(); return; 
      } 
     } 

     // Compute the average 
     double average = (double) sum/(double) N; 

     System.out.println("   Sum: " + sum); 
     System.out.println("  Workers: " + N); 
     System.out.println("  Average: " + average); 
    } 

} 

输出应该是这样的:

Starting 39 workers 
Computed 419 
Computed 36 
Computed 338 
... 
... 
... 
Computed 261 
Computed 354 
Computed 112 
     Sum: 9526 
    Workers: 39 
    Average: 244.25641025641025 
+0

令人惊叹!我会保存这个参考。为了玩俄罗斯方块,我编码遗传算法,所有的代理商必须完成他们的游戏,以便相互配合,所以人口可以发展。每个游戏都将运行在不同的线程中,这要感谢您的时间,这个java.util.concurrent API是一个生命保护程序! – Fernando

+0

还有一个问题:哪种方式似乎更快,CountDownLatch或ThreadedPool?还是根本没有区别?谢谢! – Fernando

+0

在我提供的例子中,CountDownLatch速度更快。原因是在CountDownLatch示例中,所有线程都是一次生成的。在ExecutorService示例中,最多可以同时运行10个线程,因为那是我选择的'POOL_SIZE'。如果在ExecutorService示例中将POOL_SIZE设置为39,则结果应该与CountDownLatch示例几乎相同。 –

1

您可以让以及所有相关的东西,如ThreadPools,Executors等知道。Teaser:A Future是一个带返回值的线程。

+0

*初始化为N的CountDownLatch可用于使一个线程等待,直到N个线程有comp说出了一些动作,或者某些动作已经完成了N次*。那正是我需要的!感谢你们! – Fernando

0

使用ExecutorService,并提交每个任务(为Callable)将其

,你会得到一个未来提交

每个任务
List<Future<ResultType>> results = exec.invokeAll(tasks);//tasks is a set of Callable<ResultType> 
//invokeAll blocks untill all tasks are finished 
for(Future<ResultType> f:results){ 
    ResultType x=f.get();//loop over Futures to get the result 
    //do something with x 
} 
+0

哼哼比CountDownLatch的东西更简单。期待这一点,谢谢! – Fernando