有没有办法通过一个庞大的数据库并为条目平台应用一些作业? 我试着用ExecutorService的,但我们必须为了知道池大小关闭()...Java - ExecutorService具有最大大小
所以我的最好的解决办法是:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestCode
{
private static List<String> getIds(int dbOffset, int nbOfArticlesPerRequest)
{
return Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23", "24", "25", "26", "27", "28", "29");
}
public static void main(String args[]) throws Exception
{
int dbOffset = 0;
int nbOfArticlesPerRequest = 100;
int MYTHREADS = 10;
int loopIndex = 0;
boolean bContinue=true;
Runnable worker;
while(bContinue) // in this loop we'll constantly fill the pool list
{
loopIndex++;
ExecutorService executor = Executors.newFixedThreadPool(MYTHREADS); // NOT IDEAL, BUT EXECUTORSERVICE CANNOT BE REUSED ONCE SHUTDOWN...
List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest); // getIds(offset, rows_number)
for(String id: ids) {
worker = new MyRunnable(id);
executor.execute(worker);
}
executor.shutdown();
while (!executor.isTerminated()) {
System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+
" - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size()
);
TimeUnit.MILLISECONDS.sleep(500);
}
if(loopIndex>=3) {
System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n");
bContinue = false;
}
dbOffset+=nbOfArticlesPerRequest;
}
}
public static class MyRunnable implements Runnable {
private final String id;
MyRunnable(String id) {
this.id = id;
}
@Override
public void run()
{
System.out.println("Thread '"+id+"' started");
try {
TimeUnit.MILLISECONDS.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread '"+id+"' stopped");
}
}
}
这是工作正常,但美中不足的是,在循环的每一端,我都需要等待最后一个线程完成。
例如为:当只有3个线程正在运行...
我做了如下为了解决这个问题,但就是“安全” /是否正确?
顺便说一句:有什么方法可以知道队列中有多少个任务/线程?
int dbOffset = 0;
int nbOfArticlesPerRequest = 5; //100;
int MYTHREADS = 2;
int loopIndex = 0;
ExecutorService executor = Executors.newFixedThreadPool(MYTHREADS); // **HERE IT WOULD BE A GLOBAL VARIABLE**
while(bContinue) // in this loop we'll constantly fill the pool list
{
loopIndex++;
List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest); // getIds(offset, rows_number)
for(String id: ids) {
worker = new MyRunnable(id);
executor.execute(worker);
}
while (!executor.isTerminated() && ((ThreadPoolExecutor) executor).getActiveCount() >= MYTHREADS) {
System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+
" - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size()
);
TimeUnit.MILLISECONDS.sleep(500);
}
if(loopIndex>=3) {
System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n");
bContinue = false;
}
dbOffset+=nbOfArticlesPerRequest;
}
executor.shutdown();
// Wait until all threads are finish
while (!executor.isTerminated()) {
System.out.println("Pool size is now " + ((ThreadPoolExecutor) executor).getActiveCount()+
" - queue size: "+ ((ThreadPoolExecutor) executor).getQueue().size()
);
TimeUnit.MILLISECONDS.sleep(500);
}
编辑:
我尝试推出1级或10数以百万计的任务,所以(我认为),我不能把他们都在排队......这就是为什么我使用一个全球性的执行为了能够在队列中总是有一些线程(因为我不能关闭执行程序,否则它不再可用)。
最新代码版本:
int dbOffset = 0;
int nbOfArticlesPerRequest = 5; //100;
int MYTHREADS = 2;
int loopIndex = 0;
ThreadPoolExecutor executorPool = new ThreadPoolExecutor(MYCORES, MYCORES, 0L,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // **HERE IT WOULD BE A GLOBAL VARIABLE**
while(bContinue) // in this loop we'll constantly fill the pool list
{
loopIndex++;
List<String> ids = getIds(dbOffset, nbOfArticlesPerRequest); // getIds(offset, rows_number)
for(String id: ids) {
worker = new MyRunnable(id);
executorPool.execute(worker);
}
while (executorPool.getActiveCount() >= MYTHREADS || executorPool.getQueue().size()> Math.max(1, MYTHREADS -2))
{
System.out.println("Pool size is now " + executorPool.getActiveCount()+
" - queue size: "+ executorPool.getQueue().size()
);
if(executorPool.getQueue().size() <= Math.max(1, MYCORES-2)) {
System.out.println("Less than "+Math.max(1, MYCORES-2)+" threads in queue ---> fill the queue");
break;
}
TimeUnit.MILLISECONDS.sleep(2000);
}
if(loopIndex>=3) {
System.out.println("\nEnd the loop #"+loopIndex+" ===> STOOOP!\n");
bContinue = false;
}
dbOffset+=nbOfArticlesPerRequest;
}
executorPool.shutdown();
// Wait until all threads are finish
while (!executorPool.isTerminated()) {
System.out.println("Pool size is now " + executorPool.getActiveCount()+
" - queue size: "+ executorPool.getQueue().size()
);
TimeUnit.MILLISECONDS.sleep(500);
}
在此先感谢
可以使用的invokeAll()来等待线程的完成。参考:https://stackoverflow.com/questions/3269445/executorservice-how-to-wait-for-all-tasks-to-finish/36699136#36699136 –