1

我想用ExecutorService及其函数invokeAll用Java编写程序。我的问题是:invokeAll函数是否同时解决任务?我的意思是,如果我有两个处理器,那么同时会有两名工人?因为我不能让它正确地缩放。这需要在同一时间完成的问题,如果我给newFixedThreadPool(2)或1Java ExecutorService - 缩放

List<Future<PartialSolution>> list = new ArrayList<Future<PartialSolution>>(); 
Collection<Callable<PartialSolution>> tasks = new ArrayList<Callable<PartialSolution>>(); 
for(PartialSolution ps : wp) 
{ 
    tasks.add(new Map(ps, keyWords)); 
} 
list = executor.invokeAll(tasks); 

Map是实现Callablewp是部分解决方案的一个载体,保存在不同时代的一些信息类的类。

它为什么不缩放?可能是什么问题呢?

这是PartialSolution的代码:

import java.util.HashMap; 
import java.util.Vector; 

public class PartialSolution 
{ 
    public String fileName;//the name of a file 
    public int b, e;//the index of begin and end of the fragment from the file 
    public String info;//the fragment 
    public HashMap<String, Word> hm;//here i retain the informations 
    public HashMap<String, Vector<Word>> hmt;//this i use for the final reduce 

    public PartialSolution(String name, int b, int e, String i, boolean ok) 
    { 
     this.fileName = name; 
     this.b = b; 
     this.e = e; 
     this.info = i; 
     hm = new HashMap<String, Word>(); 
     if(ok == true) 
     { 
      hmt = new HashMap<String, Vector<Word>>(); 
     } 
     else 
     { 
      hmt = null; 
     }  
    } 
} 

的这对地图的代码:

public class Map implements Callable<PartialSolution> 
{ 
    private PartialSolution ps; 
    private Vector<String> keyWords; 

    public Map(PartialSolution p, Vector<String> kw) 
    { 
     this.ps = p; 
     this.keyWords = kw; 
    } 

    @Override 
    public PartialSolution call() throws Exception 
    { 
     String[] st = this.ps.info.split("\\n"); 
     for(int j = 0 ; j < st.length ; j++) 
     { 
      for(int i = 0 ; i < keyWords.size() ; i++) 
      { 
       if(keyWords.elementAt(i).charAt(0) != '\'') 
       { 
        int k = 0; 
        int index = 0; 
        int count = 0; 

        while((index = st[j].indexOf(keyWords.elementAt(i), k)) != -1) 
        { 
         k = index + keyWords.elementAt(i).length(); 
         count++; 
        } 
        if(count != 0) 
        { 
         Word wr = this.ps.hm.get(keyWords.elementAt(i)); 
         if(wr != null) 
         { 
          Word nw = new Word(ps.fileName); 
          nw.nrap = wr.nrap + count; 
          nw.lines = wr.lines; 
          int grep = count; 
          while(grep > 0) 
          { 
           nw.lines.addElement(ps.b + j); 
           grep--; 
          } 
          this.ps.hm.put(keyWords.elementAt(i), nw); 
         } 
         else 
         { 
          Word nw = new Word(ps.fileName); 
          nw.nrap = count; 
          int grep = count; 
          while(grep > 0) 
          { 
           nw.lines.addElement(ps.b + j); 
           grep--; 
          } 
          this.ps.hm.put(keyWords.elementAt(i), nw); 
         } 
        } 
       } 
       else 
       { 
        String regex = keyWords.elementAt(i).substring(1, keyWords.elementAt(i).length() - 1); 
        StringBuffer sb = new StringBuffer(regex); 
        regex = sb.toString(); 
        Pattern pt = Pattern.compile(regex); 
        Matcher m = pt.matcher(st[j]); 
        int count = 0; 
        while(m.find()) 
        { 
         count++; 
        } 
        if(count != 0) 
        { 
         Word wr = this.ps.hm.get(keyWords.elementAt(i)); 
         if(wr != null) 
         { 
          Word nw = new Word(this.ps.fileName); 
          nw.nrap = wr.nrap + count; 
          nw.lines = wr.lines; 
          int grep = count; 
          while(grep > 0) 
          { 
           nw.lines.addElement(ps.b + j); 
           grep--; 
          } 
          this.ps.hm.put(keyWords.elementAt(i), nw); 
         } 
         else 
         { 
          Word nw = new Word(this.ps.fileName); 
          nw.nrap = count; 
          int grep = count; 
          while(grep > 0) 
          { 
           nw.lines.addElement(ps.b + j); 
           grep--; 
          } 
          this.ps.hm.put(keyWords.elementAt(i), nw); 
         } 
        } 
       } 
      } 
     } 
     this.ps.info = null; 
     return this.ps; 
    } 
} 

所以在地图我从该片段的每一行,并搜索每个表达式数的外观,我也保存线的数量。在我处理完所有片段后,在同一个PartialSolution中,我将这些信息保存在散列图中并返回新的PartialSolution。在下一步中,我将PartialSolutions与相同的fileName结合起来,并将它们引入Callable类Reduce中,它与map相同,不同之处在于它可以进行其他操作,但也返回PartialSolution。

这是跑地图的任务代码:

List<Future<PartialSolution>> list = new ArrayList<Future<PartialSolution>>(); 
Collection<Callable<PartialSolution>> tasks = new ArrayList<Callable<PartialSolution>>(); 
for(PartialSolution ps : wp) 
{ 
    tasks.add(new Map(ps, keyWords)); 
}  
list = executor.invokeAll(tasks); 

在任务我创建类型地图的任务,并在列表中我得到他们。我不知道如何读取JVM线程转储。我希望能够提供给我的信息。如果有帮助,我在NetBeans 7.0.1中工作。

谢谢 亚历

+0

多少任务,你呢?他们做什么?有很多I/O? – Thilo

+0

我的任务是那些可调用的类,它们使用PartialSolution,它有一些文本并计算一个单词出现文本和行的次数。 PartialSolution实际上是文本的一部分,我希望为每个部分获取这些信息,然后将它们与另一个名为Reduce的Callable类联合起来。我想同时处理这些零件。取决于我拥有的处理器数量。 I/O将在最后,当我将所有任务和10个部分联合起来说,并将有一个关于该文件的所有信息。 Google使用的是MapReduce。 –

+0

我想知道的是,如果方法invokeAll,如果我创建了10个线程的ExcutorService,将同时解决10个任务或一次解决一个任务?在Map中我有一个构造函数,并实现了函数call(),该函数返回另一个PartialSolution,但是这次是正确的信息。还有一个问题,如果我说list.get(i).get()这将返回PartialSolution解决之后赖特? 我真的不明白为什么不改善时间,如果我使用2线程而不是1.为什么它不缩放赖特? –

回答

2

我想知道的是如果方法的invokeAll,如果我有10个线程创建的ExcutorService,将解决10个任务在同一时间或将解决一次一个?

如果您向10个线程的ExecutorService提交10个任务,它将同时运行它们。他们是否可以完全平行并独立于彼此取决于他们在做什么。但他们每个人都有自己的线索。

另一个问题,如果我说list.get(i).get()这将返回PartialSolution解决后?

是的,它会阻塞直到计算完成(如果还没有完成)并返回结果。

我真不明白的时候为什么不提高,如果我用2个线程,而不是1

我们需要看到更多的代码。他们是否在某些共享数据上同步?这些任务需要多长时间?如果他们很短,你可能不会注意到任何差异。如果它们花费更长时间,请查看JVM线程转储以验证它们全部正在运行。

+2

+1。但有一个错误:invokeAll返回已完成期货的清单。换句话说:只有当所有任务完成后才会返回。 –

0

如果使用两个线程创建线程池,那么两个任务将同时运行。

我发现有两件事情可能导致两个线程花费与一个线程相同的时间。

如果只有一个Map任务占用大部分时间,那么额外的线程将不会使该任务运行得更快。它不能比最慢的工作完成得更快。

另一种可能性是您的地图任务经常从共享矢量中读取。这可能会导致足够的争用来取消拥有两个线程的收益。

你应该在jvisualvm中看看每个线程正在做什么。

+0

我已经安装了VisualVM,但我不知道如何使用它,我的意思是我不知道要看,如何读取数据。请帮助一些。 –

+0

我已经做了这个步骤:Profiler - > CPU - >右键单击然后线程转储...但我不明白的事情。 –

+0

@ StanciuAlexandru-Marian我建议使用ThreadFactory命名你的线程有意义的东西。然后找到线程列表中的线程。然后检查代码运行时每个线程的状态如何变化。这会给你一个每个线程正在做多少工作的指示。如果一个线程正在等待,则可以执行线程转储以查看它正在等待什么。 –

0

Java 8在Executors - newWorkStealingPool中引入了更多的API来创建工作抢占池。您不必创建RecursiveTaskRecursiveAction,但仍可以使用ForkJoinPool

public static ExecutorService newWorkStealingPool() 

创建使用所有可用的处理器作为其目标并行水平工作窃取线程池。

默认情况下,它将以CPU内核数量作为并行性参数。如果您有核心CPU,则可以有8个线程处理工作任务队列。

Work stealing of idle worker threads from busy worker threads improves overall performance. Since task queue is unbounded in nature, this ForkJoinPool is recommended for the tasks executing in short time intervals.

无论ExecutorServiceForkJoinPoolThreadPoolExecutor表现会好,如果你没有共享数据和共享锁定(同步)和线程间通信。如果任务队列中的所有任务都是相互独立的,则性能会得到提高。

ThreadPoolExecutor构造定制和任务控制工作流程:

ThreadPoolExecutor(int corePoolSize, 
         int maximumPoolSize, 
         long keepAliveTime, 
         TimeUnit unit, 
         BlockingQueue<Runnable> workQueue, 
         ThreadFactory threadFactory, 
         RejectedExecutionHandler handler) 

看一看相关SE的问题:

How to properly use Java Executor?

Java's Fork/Join vs ExecutorService - when to use which?