2012-12-07 31 views
4

我想创建一个Erastosthenes Java程序的简单并行Sieve,至少比我下面发布的串行版本更有效一些。Java多线程 - Eratosthenes Sieve的基本并行示例

public void runEratosthenesSieve(int upperBound) { 
     int upperBoundSquareRoot = (int) Math.sqrt(upperBound); 
     boolean[] isComposite = new boolean[upperBound + 1]; 
     for (int m = 2; m <= upperBoundSquareRoot; m++) { 
      if (!isComposite[m]) { 
        System.out.print(m + " "); 
       int threads=4; 
       for (int n=1; n<=threads; n++) { 
        int job; 
        if (n==1) {job = m * m;} else {job = (n-1)*upperBound/threads;} 
        int upToJob = n*upperBound/threads; 
        for (int k = job; k <= upToJob; k += m) 
        { 
         isComposite[k] = true; 
        }    
       } 
      } 
     } 
     for (int m = upperBoundSquareRoot; m <= upperBound; m++) 
      if (!isComposite[m]) 
        System.out.print(m + " "); 
    } 

我已经创建了一个循环来分割4个线程的工作。虽然我不知道如何从中创建实际的线程代码。如何发送变量并为每个线程启动4个线程。

+1

您可能想查看[Fork/Join](http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html)框架。 –

+0

我已更新我的问题。 –

回答

4

我可以提出以下解决方案:有4个工作线程和1个主线程。工作线程从队列中获取作业。工作基本上是3个数字:从,到,一步。主人的意思,而必须等到所有线程完成。完成后,它将搜索下一个素数并创建4个作业。主人和工人之间的同步可以通过使用Semaphore来实现:主人尝试获得4个许可证,而每个工作人员在完成时释放1个许可证。

public class Sieve { 

    // Number of workers. Make it static for simplicity. 
    private static final int THREADS = 4; 

    // array must be shared between master and workers threads so make it class property. 
    private boolean[] isComposite; 
    // Create blocking queue with size equal to number of workers. 
    private BlockingQueue<Job> jobs = new ArrayBlockingQueue<Job>(THREADS); 
    private Semaphore semaphore = new Semaphore(0); 
    // Create executor service in order to reuse worker threads. 
    // we can use just new Thread(new Worker()).start(). But using thread pools more effective. 
    private ExecutorService executor = Executors.newFixedThreadPool(THREADS); 

    public void runEratosthenesSieve(int upperBound) { 
     int upperBoundSquareRoot = (int) Math.sqrt(upperBound); 
     isComposite = new boolean[upperBound + 1]; 

     // Start workers. 
     for (int i = 0; i < THREADS; i++) { 
      executor.submit(new Worker()); 
     } 
     for (int m = 2; m <= upperBoundSquareRoot; m++) { 
      if (!isComposite[m]) { 
       System.out.print(m + " "); 
       for (int n=1; n<= THREADS; n++) { 
        int from; 
        if (n == 1) { 
         from = m * m; 
        } else { 
         from = (n-1)*upperBound/THREADS; 
        } 
        Job job = new Job(from, n*upperBound/threads, m); 
        // Submit job to queue. We don't care which worker gets the job. 
        // Important only that only 1 worker get the job. But BlockingQueue does all synchronization for us. 
        jobs.put(job); 
       } 
       // Wait until all jobs are done. 
       semaphore.acquire(THREADS); 
      } 
     } 
     for (int i = 0; i < n; i++) { 
      // put null to shutdown workers. 
      jobs.put(null); 
     } 
     for (int m = upperBoundSquareRoot; m <= upperBound; m++) { 
      if (!isComposite[m]) { 
       System.out.print(m + " "); 
      } 
     } 
    } 

    private class Job { 
     public int from, to, step; 

     public Job(int from, int to, int step) { 
      this.from = from; 
      this.to = to; 
      this.step = step; 
     } 
    } 

    private Worker implements Runnable { 
     while (true) { 
      Job job = jobs.take(); 
      // null means workers must shutdown 
      if (job == null) { 
       return; 
      } 
      for (int i = job.from; i <= job.to; i += job.step) { 
       isComposite[i] = true; 
      } 
      // Notify master thread that a job was done. 
      semaphore.release(); 
     } 
    } 
} 
+0

巨大的感谢!正是我在找什么! –

+0

我唯一不知道的是jobs.put()和jobs.take()方法是如何工作的。 –

+0

阅读有关BlockingQueue的文档。基本上把增加的元素放入队列中,从队列或块中返回第一个元素,直到有人放入元素为止。 –