2017-06-29 36 views
0

有没有办法通过一个庞大的数据库并为条目平台应用一些作业? 我试着用ExecutorService的,但我们必须为了知道池大小关闭()...Java - ExecutorService具有最大大小

所以我的最好的解决办法是:

import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 

public class TestCode 
{ 
private static List<String> getIds(int dbOffset, int nbOfArticlesPerRequest) 
{ 
    return Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "28", "29"); 
} 

public static void main(String args[]) throws Exception 
{ 
    int dbOffset = 0; 
    int nbOfArticlesPerRequest = 100; 
    int MYTHREADS = 10; 
    int loopIndex = 0; 
    boolean bContinue=true; 
    Runnable worker; 



    while(bContinue) // in this loop we'll constantly fill the pool list 
    { 
     loopIndex++; 
     ExecutorService executor = Executors.newFixedThreadPool(MYTHREADS); // NOT IDEAL, BUT EXECUTORSERVICE CANNOT BE REUSED ONCE SHUTDOWN... 

     List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest); // getIds(offset, rows_number) 
     for(String id: ids) { 
      worker = new MyRunnable(id); 
      executor.execute(worker); 
     } 

     executor.shutdown(); 
     while (!executor.isTerminated()) { 
      System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+ 
        " - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size() 
      ); 
      TimeUnit.MILLISECONDS.sleep(500); 
     } 

     if(loopIndex>=3) { 
      System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n"); 
      bContinue = false; 
     } 
     dbOffset+=nbOfArticlesPerRequest; 
    } 
} 



public static class MyRunnable implements Runnable { 

    private final String id; 

    MyRunnable(String id) { 
     this.id = id; 
    } 

     @Override 
     public void run() 
     { 
      System.out.println("Thread '"+id+"' started"); 
      try { 
       TimeUnit.MILLISECONDS.sleep(2000); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      System.out.println("Thread '"+id+"' stopped"); 
     } 
    } 
} 

这是工作正常,但美中不足的是,在循环的每一端,我都需要等待最后一个线程完成。

例如为:当只有3个线程正在运行...

我做了如下为了解决这个问题,但就是“安全” /是否正确?

顺便说一句:有什么方法可以知道队列中有多少个任务/线程?

int dbOffset = 0; 
    int nbOfArticlesPerRequest = 5; //100; 
    int MYTHREADS = 2; 
    int loopIndex = 0; 

    ExecutorService executor = Executors.newFixedThreadPool(MYTHREADS); // **HERE IT WOULD BE A GLOBAL VARIABLE** 
     while(bContinue) // in this loop we'll constantly fill the pool list 
     { 
      loopIndex++; 

      List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest); // getIds(offset, rows_number) 
      for(String id: ids) { 
        worker = new MyRunnable(id); 
        executor.execute(worker); 
      } 

      while (!executor.isTerminated() && ((ThreadPoolExecutor) executor).getActiveCount() >= MYTHREADS) { 
       System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+ 
         " - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size() 
       ); 
       TimeUnit.MILLISECONDS.sleep(500); 
      } 

      if(loopIndex>=3) { 
       System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n"); 
       bContinue = false; 
      } 
      dbOffset+=nbOfArticlesPerRequest; 
     } 

    executor.shutdown(); 
    // Wait until all threads are finish 
    while (!executor.isTerminated()) { 
     System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+ 
       " - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size() 
     ); 
     TimeUnit.MILLISECONDS.sleep(500); 
    } 

编辑:

我尝试推出1级或10数以百万计的任务,所以(我认为),我不能把他们都在排队......这就是为什么我使用一个全球性的执行为了能够在队列中总是有一些线程(因为我不能关闭执行程序,否则它不再可用)。

最新代码版本:

int dbOffset = 0; 
    int nbOfArticlesPerRequest = 5; //100; 
    int MYTHREADS = 2; 
    int loopIndex = 0; 

    ThreadPoolExecutor executorPool = new ThreadPoolExecutor(MYCORES, MYCORES, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // **HERE IT WOULD BE A GLOBAL VARIABLE** 
     while(bContinue) // in this loop we'll constantly fill the pool list 
     { 
      loopIndex++; 

      List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest); // getIds(offset, rows_number) 
      for(String id: ids) { 
        worker = new MyRunnable(id); 
        executorPool.execute(worker); 
      } 

      while (executorPool.getActiveCount() >= MYTHREADS || executorPool.getQueue().size()> Math.max(1, MYTHREADS -2)) 
      { 
       System.out.println("Pool size is now " + executorPool.getActiveCount()+ 
             " - queue size: "+ executorPool.getQueue().size() 
       ); 

       if(executorPool.getQueue().size() <= Math.max(1, MYCORES-2)) { 
        System.out.println("Less than "+Math.max(1, MYCORES-2)+" threads in queue ---> fill the queue"); 
        break; 
       } 

       TimeUnit.MILLISECONDS.sleep(2000); 
      } 

      if(loopIndex>=3) { 
       System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n"); 
       bContinue = false; 
      } 
      dbOffset+=nbOfArticlesPerRequest; 
     } 

    executorPool.shutdown(); 
    // Wait until all threads are finish 
    while (!executorPool.isTerminated()) { 
     System.out.println("Pool size is now " + executorPool.getActiveCount()+ 
       " - queue size: "+ executorPool.getQueue().size() 
     ); 
     TimeUnit.MILLISECONDS.sleep(500); 
    } 

在此先感谢

+0

可以使用的invokeAll()来等待线程的完成。参考:https://stackoverflow.com/questions/3269445/executorservice-how-to-wait-for-all-tasks-to-finish/36699136#36699136 –

回答

0

更新

现在很明显,我认为你的主要关注它,你不能在提交千万任务一旦。

不要害怕,你可以把它们全部提交给执行者。并行运行的实际任务数量受底层线程池大小的限制。也就是说,如果你的池大小为2,那么当时只有两个任务正在执行,其余的则在队列中等待空闲线程。

默认情况下,Executors.newFixedThreadPool()创建一个队列大小为Integer.MAX_VALUE的执行程序,因此您的数百万个任务将适合此处。


您可以使用ExecutorService.submit()方法返回Future。然后,您可以检查未来任务的状态(即使用isDone(),isCancelled()方法)。

执行程序通常是您不希望显式关闭并且在整个应用程序生命周期中存在的东西。通过这种方法,您无需关闭以了解有多少任务正在处理中。

List<Future<?>> tasks = new ArrayList<>(); 
for (String id : ids) { 
    Future<?> task = executorService.submit(() -> { 
     // do work 
    }); 
    tasks.add(task); 
} 

long pending = tasks.stream().filter(future -> !future.isDone()).count(); 
System.out.println(pending + " task are still pending"); 

而且,请注意,任务和线程是不能互换的条款。在你的情况下,执行者具有固定的线程数。您可以提交比此更多的任务,但其余部分将位于执行程序队列中,直到有空闲线程运行任务为止。

+0

可能是一个好主意......然后我只需要添加一个“等待循环”,以便在少于X任务运行时分配更多任务... – Bast

+0

与@Pavan相同的问题,与我的解决方案相比,您的解决方案有什么优势(请参阅EDIT之后的最新代码)? – Bast

+0

好吧,现在我明白了你的观点并更新了答案。我认为你正在努力模拟Executors已经提供的东西--_queue_。 –

0

ExecuterService允许您调用可并行运行的任务列表,并在结果可用时返回结果。

在你的代码使用

worker = new MyRunnable(id); 
executor.execute(worker); 

相反Runnable,它能够更好地使用Callable在这种使用情况下,那么你可以提交可调用的列表执行单一的API而不是为循环。

List<Callable> workers = new ArrayList<>(); 
workers.add(new MyCallable(id)) // this is just for example 
workers.add(new MyCallable(id)) 
workers.add(new MyCallable(id)) 

List<Future<Boolean>> futures = executor.invokeAll(workers); // this will execute all worker tasks parallely and return you future object list using which you can see whether worker thread is completed or not and also the what is the return value. 

注意未来对象的get方法阻塞调用

+0

invokeAll也是阻塞的,所以最初的问题(需要等待每个循环的最后一个线程)没有解决。 :) – Bast

+0

@Bast - 根据我的理解,invokeAll不阻止呼叫。 https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#invokeAll(java.util.Collection) – Pavan

+0

你是对的,只有future.get()正在阻止...我可以使用您的解决方案作为由@DavidSiro建议的解决方案... – Bast

0

你不需要知道线程池的大小,检查任务的完成在ExecutorService。提交任务后,您可以删除您的代码。

选项1:

  1. newWorkStealingPool从执行人替换的ThreadPoolExecutor。

    使用所有可用的处理器创建工作线程池作为其目标并行级别。

    它可以更好地利用ExecutorService中的线程。

    ExecutorService executor = Executors.newWorkStealingPool(); 
    
  2. 使用invokeAll

选项2:(有用的,如果你知道任务提前数)

使用CountDownLatch并初始化计数器任务数是提交。

更多参考:

wait until all threads finish their work in java

How to properly shutdown java ExecutorService

+0

是的,但因为它是一个while循环我想动态添加新线程,以便始终有一些在“队列” ...实际上使用getActiveCount()更正确(代码更新) - 我现在甚至切换到我的本地代码中的ThreadPoolExecutor – Bast

+0

请注意,在我的第二个代码部分(即:“解决方案”)中,ExecutorService是全局的,所以它不能被关闭,否则它不再可用 – Bast

+0

在while循环之外,可以按照上面引用的顺序使用shutdown,sbutdownNow,awaitTermination API来保持关闭代码。 –