2011-03-21 27 views
0

我有一个列表只读数据是我想要处理的互联网上的一串端点。我想知道是否有任何类型的内置类或模式,我应该遵循来处理?使用线程处理列表的智能方式

现在我只是跳水的初始列表为ñ块和创造ñ线程来处理每个请求。

回答

4

使用ExecutorService来处理您的并发处理。

public void processAll(List<Endpoint> endpoints, int numThreads) { 
    ExecutorService executor = Executors.newFixedThreadPool(numThreads); 

    for(final Endpoint endpoint : endpoints) { 
     executor.submit(new Runnable() { 
      @Override 
      public void run() { 
       doProcessing(endpoint); 
      } 
     }); 
    } 
    // instead of a loop, you could also use ExecutorService#invokeAll() 
    // with the passed-in list if Endpoint implements 
    // java.util.concurrent.Callable 

    executor.shutdown(); 
} 

private void doProcessing(Endpoint endpoint) { 
    // do whatever you do with each one 
} 

这只是一个简单的例子。查看一些关于如何使用更具体的ExecutorService类型的示例的API,处理Futures,并做各种漂亮的东西。

0

查看java.util.concurrent包和ExecutorService。

Brian Goetz的书Java Concurrency in Practice是理解这些东西的必备工具。

2

听起来像是Queue(使用java.util.concurrent中的一个实现)就是你需要的。这样,每个线程在准备就绪时就可以获得链接,这比事先进行分区更有意义。

0

阻塞队列可能是最适合您的方式。谷歌它,你会发现很多的信息,这是一个很好的教程:http://www.developer.com/java/ent/article.php/3645111/Java-5s-BlockingQueue.htm

1

您将需要三认为:

  • 两个阻塞列表 - 用数据来porcess的结果第一,第二
  • 执行人服务
  • 某种锁

我的示例应用程序:

public class App { 

    private static final int NUMBER_OF_THREADS = 3; 

    public static void main(String[] args) throws InterruptedException { 

     BlockingQueue<String> data = prepareData(); 

     BlockingQueue<String> results = new LinkedBlockingQueue<String>(); 

     ExecutorService executor = Executors.newFixedThreadPool(3); 
     CountDownLatch countDownLatch = new CountDownLatch(3); 

     for (int i = 0; i < NUMBER_OF_THREADS; i++) 
      executor.execute(new Processor<String>(data, results, 
        countDownLatch, i + "")); 

     countDownLatch.await(); 
    } 

    private static BlockingQueue<String> prepareData() { 
     BlockingQueue<String> queue = new LinkedBlockingQueue<String>(); 
     for (int i = 0; i < 1000; i++) { 
      queue.add(i + ""); 
     } 
     return queue; 
    } 

} 

class Processor<T> implements Runnable { 

    private BlockingQueue<T> dataSource; 

    private CountDownLatch latch; 

    private String name; 

    private BlockingQueue<String> results; 

    public Processor(BlockingQueue<T> dataSource, 
      BlockingQueue<String> results, CountDownLatch countDownLatch, 
      String processName) { 
     this.dataSource = dataSource; 
     this.results = results; 
     this.latch = countDownLatch; 
     this.name = processName; 
    } 

    @Override 
    public void run() { 
     T t = null; 
     while ((t = dataSource.poll()) != null) { 
      try { 
       String result = "Process " + name + " processing: " 
         + t.toString(); 
       System.out.println(result); 
       results.put(result); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
     latch.countDown(); 
    } 
} 

准备好数据后创建一些处理器来处理数据。每个处理器都有对线程保存数据源的引用。获取对象,处理它们并最终将结果放到另一个包含结果的线程保存集合中。

当数据源变空时,然后调用latch.countDown()向等待结果的主线程或线程说“完成了一切”。