3

我正在阅读Java ForkJoin框架。在ForkJoinTask(例如RecursiveTask)的实施中,如果不直接呼叫invoke()还有什么额外的好处,但是要实例化ForkJoinPool并呼叫pool.invoke(task)?当我们将这两种方法称为invoke时究竟发生了什么?ForkJoinPool.invoke()和ForkJoinTask.invoke()或compute()

从源头上看,如果recursiveTask.invoke被调用,它将调用它的exec并最终以管理的线程池方式调用compute。因此,为什么我们有成语pool.invoke(task)更令人困惑。

我写了一些简单的代码来测试性能差异,但我没有看到任何。也许测试代码是错误的?请看下图:

public class MyForkJoinTask extends RecursiveAction { 

    private static int totalWorkInMillis = 20000; 
    protected static int sThreshold = 1000; 

    private int workInMillis; 


    public MyForkJoinTask(int work) { 
     this.workInMillis = work; 
    } 

    // Average pixels from source, write results into destination. 
    protected void computeDirectly() { 
     try { 

      ForkJoinTask<Object> objectForkJoinTask = new ForkJoinTask<>(); 
      Thread.sleep(workInMillis); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 

    @Override 
    protected void compute() { 
     if (workInMillis < sThreshold) { 
      computeDirectly(); 
      return; 
     } 

     int discountedWork = (int) (workInMillis * 0.9); 
     int split = discountedWork/2; 

     invokeAll(new MyForkJoinTask(split), 
       new MyForkJoinTask(split)); 
    } 

    public static void main(String[] args) throws Exception { 
     System.out.printf("Total work is %d in millis.%n", totalWorkInMillis); 
     System.out.printf("Threshold is %d in millis.%n", sThreshold); 

     int processors = Runtime.getRuntime().availableProcessors(); 
     System.out.println(Integer.toString(processors) + " processor" 
       + (processors != 1 ? "s are " : " is ") 
       + "available"); 

     MyForkJoinTask fb = new MyForkJoinTask(totalWorkInMillis); 

     ForkJoinPool pool = new ForkJoinPool(); 

     long startTime = System.currentTimeMillis(); 


     // These 2 seems no difference! 
     pool.invoke(fb); 
//  fb.compute(); 


     long endTime = System.currentTimeMillis(); 

     System.out.println("Took " + (endTime - startTime) + 
       " milliseconds."); 
    } 
} 

回答

4

RecursiveTask类的compute()方法仅仅是包含任务代码的抽象方法。它不使用池中的新线程,如果您正常调用它,则不会在池管理线程中运行。

叉式连接池中的invoke方法向池提交一个任务,然后该池开始在单独的线程上运行,调用该线程上的compute方法,然后等待结果。

你可以在java doc的RecursiveTask和ForkJoinPool中看到这个。 invoke()方法实际上执行任务,而compute()方法只是封装计算。

protected abstract V compute() 

该任务执行的主要计算。

而且ForkJoinPool

public <T> T invoke(ForkJoinTask<T> task) 

执行给定的任务,完成后返回其结果。 ...所以与计算方法

,你在做什么是运行在第一次调用compute外叉的加入池。您可以通过在compute方法内添加日志行来测试这一点。

System.out.println(this.inForkJoinPool()); 

您还可以检查它在同一个线程通过登录线程ID

System.out.println(Thread.currentThread().getId()); 

一旦你调用invokeAll运行,包括在调用子任务,然后在池中运行。请注意,它不一定在您拨打compute()之前创建的池中运行。你可以注释掉你的new ForkJoinPool()代码,它仍然会运行。有趣的是,java 7 doc说invokeAll()方法会抛出一个异常,如果它在一个池托管线程之外被调用,但java 8 doc不会。我没有在java 7中介绍过你的测试(只有8个)。但很有可能,在java 7中直接调用compute()时,您的代码会引发异常。

两个结果都返回相同时间的原因是毫秒不太不够准确记录开始在池进行管理,线程第一个线程,或者只是运行在现有的线程中的第一compute调用的差异。

的方式OCA/OCP学习指南由Sierra和贝茨建议您使用叉join框架是调用invoke()从池中。它清楚地说明了您正在使用的是哪个池,这也意味着您可以将多个任务提交到同一个池中,这样可以节省每次重新创建新池的开销。从逻辑上讲,将所有任务计算保留在池管理线程内(或者至少我认为是这样)也更清晰。

+0

感谢您的回答。但'RecursiveTask'也有一个继承的'invoke'方法,通常在其主'compute'方法内调用'invoke'。那是什么呢?还有,怎么没有性能差异? – Boyang

+0

所以我深入了解源代码。如果调用'recursiveTask.invoke',它将以托管线程池的方式调用它的'exec'并最终'compute'。在这种情况下,我更加困惑于成语'pool.invoke(task)'。为什么? – Boyang

+0

请参阅我的新的编辑 – Boyang