2017-01-06 154 views
0

假设我有一个很大的队列,就像10,000个对象一样。我想用5个工作线程创建一个线程池,每个工作线程从队列中移除一个项并对其进行处理,直到队列为空。队列中的多线程作业 - 作业太多?

我担心的是,通过使用我在不同地方看到的设置,我最终创建了10,000个工作,但是通过5名工作人员执行。我觉得这不是真正可扩展的 - 队列已经有10,000个项目,现在我在堆栈上还有10,000个作业(即使它们没有被正在执行,这看起来像是一个内存问题)。

这似乎是这个答案的建议:https://stackoverflow.com/a/9916299/774359 - 这是“// now submit our jobs”部分,让我很担心。是否有效地将队列倒入工作中是一个问题?

这里是什么,我至今一个简单的例子:)

在main(:

ExecutorService executor = Executors.newFixedThreadPool(5); 
while(!hugeQueue.isEmpty()) { 
    String work = hugeQueue.remove(); 
    System.out.println("Creating job for " + work); 
    Runnable worker = new Worker(work); 
    executor.execute(worker); 
} 

在Worker类:​​

public Worker(String itemFromQueue) { this.job = itemFromQueue; } 

@Override 
public void run() { 
    System.out.println("Working on " + this.itemFromQueue); 
    //Do actual work 
} 

hugeQueue包含万个的数字,我查看所有10,000个“创建作业”消息,然后查看所有10,000个“正在处理”消息。我认为如果只有5个职位同时创建,然后他们就可以开展工作 - 当一个线程打开时,它会创建另一个工作,然后工作。这样,堆栈中就不会有10,000个作业。我将如何实现这一目标?我是否正确地思考这个架构?


编辑,包括基于一个答案更新的信息:

@ seneque的代码没有编译通俗易懂,所以我做了一些细微的变化 - 不幸的是,这个输出是工人刚刚创建,而没有实际的工作。

在main():

int numOfThreads = 5; 
BlockingQueue<Integer> hugeQueue = new LinkedBlockingQueue<>(); 
for(int x = 0; x < 1000; x++) { hugeQueue.add(x); } 

ExecutorService executor = Executors.newFixedThreadPool(numOfThreads); 
LongRunningWorker longRunningWorker = new LongRunningWorker(); 

for(int i = 0; i < numOfThreads ; i++) { 
    System.out.println("Created worker #" + i); 
    executor.submit(longRunningWorker); 
} 
System.out.println("Done"); 

在LongRunningWorker:

public class LongRunningWorker implements Runnable { 
    BlockingQueue<Integer> workQueue; 
    void spiderExmaple(BlockingQueue<Integer> workQueue) { 
     this.workQueue = workQueue; 
    } 

    @Override 
    public void run() { 
     try { 
      while(workQueue.poll(3, TimeUnit.SECONDS) != null) { 
       Integer work = workQueue.remove(); 
       System.out.println("Working on " + work); 
       new Worker(work).run(); 
      } 
     } catch (InterruptedException e) { e.printStackTrace(); } 
    } 
} 

在工人:

public class Worker implements Runnable{ 
    Integer work; 
    Worker(Integer x) { this.work = x; } 

    @Override 
    public void run() { 
     System.out.println("Finished work on " + this.work); 

    } 
} 
+0

甲ThreadPoolExecutorService(其是在创建什么当调用Executors.newFixedThreadPool(5))具有内部队列。所以在这里,你从一个队列中取出另一个队列,这个队列将被5个线程读取 – seneque

+0

@seneque Right - 这意味着,对于我自己的10,000个项目队列,我将有效地创建第二个队列相同的大小,正确?对象是不同的,但我的问题是这是否是一个可行的解决方案给予双重内存要求 – Jake

+0

而不是,如果hugeQueue是一个阻塞队列,你可以让你的5线程引用你的队列并从队列中轮询。 – seneque

回答

1

一种解决方案将是有你五个直接轮询队列。

BlockingQueue<String> hugeQueue = ... 
ExecutorService executor = Executors.newFixedThreadPool(5); 
LongRunningWorker longRunningWorker = new LongRunningWorker(hugeQueue); 
for(int i = 0; i < 5 ; i++) { 
    executor.submit(longRunningWorker) 
} 

然后LongRunningWorker的定义如下:

class LongRunningWorker(BlockingQueue<String> workQueue) extends Runnable { 
    final BlockingQueue<String> workQueue; 
    LongRunningWorker(BlockingQueue<String> workQueue) { 
     this.workQueue = workQueue; 
    } 

    public void run() { 
     while((String work = workQueue.poll(3, TimeUnit.Second) != null) { 
      try { 
       new Worker(work).run(); 
      } catch (Exception e) { 
       // 
      } 
     } 
    } 
} 
+0

完美无缺,正是我一直在寻找的。谢谢! – Jake

+0

它看起来像代码不工作,检查出来后 - 任何建议?它只是表明它创造了工人的事实,但工作本身从未开始。 – Jake

+0

新工人(工作).run(); < - 所以它应该做的工作 – seneque