2012-01-05 45 views
2

我需要用Java编写一个程序,它将读取目录树中相对大量(〜50,000个)文件,处理数据并将处理后的数据输出到单独的(平坦的)目录。并行读取和写入多个文件

目前,我有这样的事情:

private void crawlDirectoyAndProcessFiles(File directory) { 
    for (File file : directory.listFiles()) { 
    if (file.isDirectory()) { 
     crawlDirectoyAndProcessFiles(file); 
    } else { 
     Data d = readFile(file); 
     ProcessedData p = d.process(); 
     writeFile(p,file.getAbsolutePath(),outputDir); 
    } 
    } 
} 

我只想说,每个这样的方法被删除,下调为便于阅读,但他们都做工精细。整个过程工作正常,但速度很慢。数据处理通过远程服务进行,需要5-15秒。乘以5万...

我从来没有做过任何事情多线程之前,但我想我可以得到一些相当不错的速度提高,如果我这样做。任何人都可以提供一些指导我如何有效地平行这种方法?

+0

文件的大小和处理的密度如何?我问,因为如果有更多的时间花在从磁盘读取文件上,那么你实际上不会在线程中获得太多的收益。 – SimonC 2012-01-05 05:01:05

+1

对于几乎可以确定为磁盘绑定的任务,您不太可能获得任何加速。除非您试图并行化位于不同物理驱动器上的目录... – Mysticial 2012-01-05 05:02:00

+0

您是将输出转换为单个文件还是每个文件的文件? – MahdeTo 2012-01-05 05:13:56

回答

5

我会使用ThreadPoolExecutor来管理线程。你可以做这样的事情:

private class Processor implements Runnable { 
    private final File file; 

    public Processor(File file) { 
     this.file = file; 
    } 

    @Override 
    public void run() { 
     Data d = readFile(file); 
     ProcessedData p = d.process(); 
     writeFile(p,file.getAbsolutePath(),outputDir); 
    } 
} 

private void crawlDirectoryAndProcessFiles(File directory, Executor executor) { 
    for (File file : directory.listFiles()) { 
     if (file.isDirectory()) { 
      crawlDirectoryAndProcessFiles(file,executor); 
     } else { 
      executor.execute(new Processor(file); 
     } 
    } 
} 

你会使用获得执行人:

ExecutorService executor = Executors.newFixedThreadPool(poolSize); 

其中poolSize是你想要去的线程在一次的最大数量。 (在这里有一个合理的数字非常重要; 50,000个线程并不是一个好主意,合理的数字可能是8)。请注意,在排队所有文件后,主线程可以等待,直到调用完成executor.awaitTermination

+0

在最后考虑一个“连接”,以确保所有处理都完成,以便与原始方法的行为保持一致 – 2012-01-06 10:11:04

+1

@AdrianShum - 好点;我建议现在使用'ExecutorService.awaitTermination()' – 2012-01-06 18:20:51

+1

这个例子最好用fork连接池运行,这个池可以和'new ForkJoinPool(numprocs)'一起使用并等待终止。密集流程实际上对这些池的效果最好,而像Fibonacci序列这样的小流程对单线程或线程执行程序来说可能是最好的(对于正确管理的自定义代码更好)。 – 2013-12-17 16:02:41

1

最简单的(也许是最合理的)方法之一是有一个线程池(看看相应的执行程序)。主线程负责在目录中进行爬网。遇到文件时,创建一个“作业”(Runnable/Callable)并让Executor处理作业。

(这应该是足以让你开始,我不想让太多的具体代码怎么把它不应该是你很难搞清楚,一旦您已经阅读执行人,赎回等部分)

5

假设您有一个硬盘(即只允许单个同时读取操作的东西,而不是SSD或RAID阵列,网络文件系统等),那么您只需要一个线程执行IO(从/写入磁盘)。此外,您只需要尽可能多的线程执行CPU内核操作,否则时间将会在上下文切换中浪费。

鉴于上述限制,下面的代码应该适合您。单线程执行程序确保一次只能执行一个Runnable。固定线程池确保不超过NUM_CPUSRunnable s在任何时间都在执行。

这件事不能做的一件事是提供处理完成时间的反馈。

private final static int NUM_CPUS = 4; 

private final Executor _fileReaderWriter = Executors.newSingleThreadExecutor(); 
private final Executor _fileProcessor = Executors.newFixedThreadPool(NUM_CPUS); 

private final class Data {} 
private final class ProcessedData {} 

private final class FileReader implements Runnable 
{ 
    private final File _file; 
    FileReader(final File file) { _file = file; } 
    @Override public void run() 
    { 
    final Data data = readFile(_file); 
    _fileProcessor.execute(new FileProcessor(_file, data)); 
    } 

    private Data readFile(File file) { /* ... */ return null; }  
} 

private final class FileProcessor implements Runnable 
{ 
    private final File _file; 
    private final Data _data; 
    FileProcessor(final File file, final Data data) { _file = file; _data = data; } 
    @Override public void run() 
    { 
    final ProcessedData processedData = processData(_data); 
    _fileReaderWriter.execute(new FileWriter(_file, processedData)); 
    } 

    private ProcessedData processData(final Data data) { /* ... */ return null; } 
} 

private final class FileWriter implements Runnable 
{ 
    private final File _file; 
    private final ProcessedData _data; 
    FileWriter(final File file, final ProcessedData data) { _file = file; _data = data; } 
    @Override public void run() 
    { 
    writeFile(_file, _data); 
    } 

    private Data writeFile(final File file, final ProcessedData data) { /* ... */ return null; } 
} 

public void process(final File file) 
{ 
    if (file.isDirectory()) 
    { 
    for (final File subFile : file.listFiles()) 
     process(subFile); 
    } 
    else 
    { 
    _fileReaderWriter.execute(new FileReader(file)); 
    } 
} 
+0

当你调用'_fileReaderWriter.execute(new FileWriter(_file,processedData));',这是一个异步调用? – 2014-01-29 17:14:38

+0

是的,它会在'_fileReaderWriter'队列的一个线程上执行一个新任务。 – SimonC 2014-02-03 13:07:28