2011-05-11 41 views
2

想象一下java.io.File对象的序列。序列没有任何特定的顺序,它在目录遍历之后被填充。这些文件的名称可以是这样的:使用平行阵列处理此数据的首选方式

/some/file.bin 
/some/other_file_x1.bin 
/some/other_file_x2.bin 
/some/other_file_x3.bin 
/some/other_file_x4.bin 
/some/other_file_x5.bin 
... 
/some/x_file_part1.bin 
/some/x_file_part2.bin 
/some/x_file_part3.bin 
/some/x_file_part4.bin 
/some/x_file_part5.bin 
... 
/some/x_file_part10.bin 

基本上,我可以有3种类型的文件。第一种是简单的,只有.bin的扩展名。第二种类型的文件是从_x1.bin_x5.bin形成的文件。第三种类型的文件可以由10个较小的部分组成,从_part1_part10。 我知道命名可能很奇怪,但这是我必须与之合作:)

我想将文件组合在一起(文件的所有部分都应该一起处理),并且我正在考虑使用并行阵列来做到这一点。我不确定的是如何执行reduce/acumulation部分,因为所有线程都将在同一个数组上工作。

val allBinFiles = allBins.toArray // array of java.io.File 

我想处理这样的事情:

val mapAcumulator = java.util.Collections.synchronizedMap[String,ListBuffer[File]](new java.util.HashMap[String,ListBuffer[File]]()) 

allBinFiles.par.foreach { file => 
    file match { 
     // for something like /some/x_file_x4.bin nameTillPart will be /some/x_file 
     case ComposedOf5Name(nameTillPart) => { 
      mapAcumulator.getOrElseUpdate(nameTillPart,new ListBuffer[File]()) += file 
     } 
     case ComposedOf10Name(nameTillPart) => { 
      mapAcumulator.getOrElseUpdate(nameTillPart,new ListBuffer[File]()) += file 
     } 
     // simple file, without any pieces 
     case _ => { 
      mapAcumulator.getOrElseUpdate(file.toString,new ListBuffer[File]()) += file 
     } 
    } 
} 

我想这样做就像我在上面的代码中已经证明的。提取文件的提取器,并将部分路径用作映射中的键。例如,/some/x_file可以保存为值/some/x_file_x1.bin/some/x_file_x5.bin。我也认为可能有更好的方法来处理这个问题。我会对你的意见感兴趣。

+0

这是必须运行一次还是需要定期执行?这些文件是否会在某个时候被读取?如果是这样,那么任务可能是IO限制的,并且优化(并行化)至少不成熟,如果不是完全不必要的话。 – 2011-05-11 08:12:46

+0

稍后将部分读取文件,根据其内容完成一些处理,并且会发生大量压缩。我也打算并行压缩。 – Geo 2011-05-11 08:15:06

回答

1

的替代方法是使用groupBy

val mp = allBinFiles.par.groupBy { 
    case ComposedOf5Name(x) => x 
    case ComposedOf10Name(x) => x 
    case f => f.toString 
} 

这将返回文件的平行阵列(ParMap[String, ParArray[File]])的并联地图。如果你想从这个点文件的连续序列的顺序图:

val sqmp = mp.map(_.seq).seq 

要确保并行踢,要确保你有你足够的元素并行阵列(10K +)。

+0

大小取决于它通常有几千个文件,但我不知道它是否始终是10k +。为什么/如何定义10k限制? – Geo 2011-05-11 08:18:10

+0

这不是一个硬限制,只是拇指的一个规则。这取决于您的高阶操作员的昂贵。通常,当高阶操作员非常便宜时,对于大多数操作,至少需要10k个元素来注意加速。这是由于底层的fork/join框架及其开销(切换线程上下文,同步等)。在你的情况下,如果你有几个提取器在你的HOP中搜索字符串,几千个就足够了。我建议你尝试对它进行基准测试,看看你是否加速。 – axel22 2011-05-11 08:22:22

+0

顺便提一下,groupBy将把所有5个部分文件和10个部分文件中的所有文件组合在一个数组中。我想根据除'_part' /'_x'之外的名称分别对它们进行分组。 – Geo 2011-05-12 08:40:04