2012-06-05 38 views
3

我试图找出内存效率和功能的方式来处理大规模使用字符串中阶数据处理大数据阶的功能性的方式。我已经阅读了很多关于懒惰集合的东西,并且看到了很多代码示例。但是,我一次又一次遇到“超出GC开销”或“Java堆空间”问题。斯卡拉懒收藏

通常的问题是,我试图建立一个懒惰的集合,但是当我把它添加到集种植(我不知道现在任何其他方式这样做增量)评估每一个新的元素。当然,我可以尝试一些方法,例如先初始化一个初始的懒惰集合,然后通过使用地图等应用资源关键型计算来生成具有所需值的集合,但通常我只是不知道最终集合的确切大小最初是懒惰的收集。

也许你可以通过给我提示或解释如何改进下面的代码来帮助我,例如根据奇数序列对属于一个的规则将FASTA(定义如下)格式的文件分割成两个单独的文件档案,甚至一个到另一个(“分离股”)。要做到这一点“最直接的”最简单的方法就是通过循环遍历行并通过打开的文件流打印到相应的文件中(当然这很好)。然而,我只是不喜欢重新赋值给包含头文件和序列的变量的样式,因此下面的示例代码使用(尾)递归,并且我希望找到一种方法来维护类似的设计而不会遇到资源问题!

该示例对于小文件非常适用,但已经在大约500mb左右的文件中,代码将因标准JVM设置而失败。我想处理“仲裁”大小的文件,比如10-20gb左右。引起我

val fileName = args(0) 
val in = io.Source.fromFile(fileName) getLines 

type itType = Iterator[String] 
type sType = Stream[(String, String)] 

def getFullSeqs(ite: itType) = { 
    //val metaChar = ">" 
    val HeadPatt = "(^>)(.+)" r 
    val SeqPatt = "([\\w\\W]+)" r 
    @annotation.tailrec 
    def rec(it: itType, out: sType = Stream[(String, String)]()): sType = 
     if (it hasNext) it next match { 
      case HeadPatt(_,header) => 
       // introduce new header-sequence pair 
       rec(it, (header, "") #:: out) 
      case SeqPatt(seq) => 
       val oldVal = out head 
       // concat subsequences 
       val newStream = (oldVal._1, oldVal._2 + seq) #:: out.tail  
       rec(it, newStream) 
      case _ => 
       println("something went wrong my friend, oh oh oh!"); Stream[(String, String)]()     
     } else out 
    rec(ite)  
} 

def printStrands(seqs: sType) { 
    import java.io.PrintWriter 
    import java.io.File 
    def printStrand(seqse: sType, strand: Int) { 
     // only use sequences of one strand 
     val indices = List.tabulate(seqs.size/2)(_*2 + strand - 1).view 
     val p = new PrintWriter(new File(fileName + "." + strand)) 
     indices foreach { i => 
       p.print(">" + seqse(i)._1 + "\n" + seqse(i)._2 + "\n") 
     }; p.close 
     println("Done bro!") 
    } 
    List(1,2).par foreach (s => printStrand(seqs, s)) 
} 

printStrands(getFullSeqs(in)) 

三个问题:

A)让我们假设一个需要保持通过处理从getLines得到像我getFullSeqs方法的初始迭代器获得一个大的数据结构(注意不同大小ingetFullSeqs输出),因为整体上的转换(!)数据被反复要求,因为一个不知道人会要求在任何步骤将数据的一部分。我的例子可能不是最好的,但怎么做呢?有没有可能? B)当期望的数据结构不是固有的懒惰时,例如,如果想将(header -> sequence)对存储到Map()中,什么时候?你会把它包装在一个懒惰的集合?

C)我构建流的实现可能会颠倒输入行的顺序。当调用reverse时,所有元素都将被评估(在我的代码中,它们已经是,所以这是实际的问题)。有没有什么办法可以从后面以懒惰的方式进行后期处理?我知道reverseIterator,但这是否已经是解决方案,还是不会实际首先评估所有元素(因为我需要在列表中调用它)?我们可以用newVal #:: rec(...)来构建这个流,但是我会失去尾递归,不是吗?

所以我基本上需要的是添加元素到一个集合,这是不加入评估的过程。所以lazy val elem = "test"; elem :: lazyCollection是不是我所期待的。

编辑:我也尝试使用rec的流参数的名称参数。

非常感谢您的关注和时间,我非常感谢您的帮助(再次:))。

////////////////////////////////////////////// ////////////////////////////////////////////////// ////////////////////////////////////////////////// ///////////////////

FASTA定义为一个顺序集合由单个头部行分隔序列。标题被定义为以“>”开头的行。标题下的每一行都被称为与标题相关的序列的一部分。一个序列在新的标题存在时结束。每个标题都是唯一的。例如:

>头1
ABCDEFG
> HEADER2
hijklmn
opqrstu
> HEADER3
VWXYZ
> HEADER4
zyxwv

因此,序列2是两倍大,SEQ 1我的计划是将文件分割成包含一个文件

>头1
ABCDEFG
> HEADER3
VWXYZ

和含有

> HEADER2
hijklmn
opqrstu第二文件B
> HEADER4
zyxwv

输入文件假定由偶数个头部序列组成配对。

+1

http://pastebin.com/bvaKiA3e,做一模一样的必要和非常简洁的方式 - 只是它也可以与文件大小不受限制(然而,这仅仅是在特定的示例中的解决方案,但只要我需要改造整个数据,我会到相同的内存问题) –

回答

4

的关键在于与真正的大数据结构的工作仅在内存是执行任何你需要操作的关键举行。所以,你的情况,这是

  • 输入文件
  • 你的两个输出文件
  • 文本

就是这样的当前行。在某些情况下,您可能需要存储信息,例如序列的长度;在这样的事件中,您将在第一遍中构建数据结构并在第二遍中使用它们。让我们假设,例如,你决定,你想写三个文件:一个用于偶数记录,一个奇怪的,一个是在总长度小于300个核苷酸的条目。你会做这样的事情(警告 - 它编译但我从来没有遇到它,所以它可能不会实际工作):

final def findSizes(
    data: Iterator[String], sz: Map[String,Long] = Map(), 
    currentName: String = "", currentSize: Long = 0 
): Map[String,Long] = { 
    def currentMap = if (currentName != "") sz + (currentName->currentSize) else sz 
    if (!data.hasNext) currentMap 
    else { 
    val s = data.next 
    if (s(0) == '>') findSizes(data, currentMap, s, 0) 
    else findSizes(data, sz, currentName, currentSize + s.length) 
    } 
} 

然后进行处理,您使用的地图,并通过再次:

import java.io._ 
final def writeFiles(
    source: Iterator[String], targets: Array[PrintWriter], 
    sizes: Map[String,Long], count: Int = -1, which: Int = 0 
) { 
    if (!source.hasNext) targets.foreach(_.close) 
    else { 
    val s = source.next 
    if (s(0) == '>') { 
     val w = if (sizes.get(s).exists(_ < 300)) 2 else (count+1)%2 
     targets(w).println(s) 
     writeFiles(source, targets, sizes, count+1, w) 
    } 
    else { 
     targets(which).println(s) 
     writeFiles(source, targets, sizes, count, which) 
    } 
    } 
} 

您再使用Source.fromFile(f).getLines()两次,以创建您的迭代器和你所有的设置。 编辑:从某种意义上说,这是关键的一步,因为这是你的“懒惰”集合。然而,它并不重要,因为它不会立即读取所有内存(“懒惰”),而是因为它不存储任何以前的字符串!

更一般地说,Scala无法帮助您仔细考虑内存中需要的信息以及您可以根据需要获取磁盘的内容。懒惰评估有时可以提供帮助,但没有神奇的公式,因为您可以轻松地表达要求以懒惰的方式将所有数据存储在内存中。斯卡拉不能解释你的命令访问内存,作为秘密,指示从磁盘提取的东西,而不是。 (当然,除非你从磁盘这正是这么做的写一个库缓存结果。)

+0

我实际使用类似的方法(虽然不是一致为你的例子可能是,因为我是用指数瞎搞),但我觉得这样愚蠢地调用getLines一次又一次知道通过流的inifnite列表的示例(fib示例等),其元素仅在需要时才被评估。我仍然需要把这件事记在脑海里。我明白你的观点,非常感谢你的解释!我真的很感激。最后一个问题:对于我如何反复建立惰性集合,你有任何提示吗?或者这又是与这些例子相同的点。 –

+0

@WayneJhukie - 通过流无限列表_only工作,如果你可以放弃,此时通常可以使用迭代器代替stream_的早期部分。如果您只需要一点点历史记录,则流功能非常强大,但确保您不需要保留流首部的副本是非常棘手的。先用迭代器试试吧!由于他们不持有状态,你不太可能遇到麻烦。如果_require_保持状态,则考虑使用流,或者'Iterator.duplicate'(如果您将同时前进两者,因为差异已存储)。 –

3

人们可以构造具有的newval#流:: REC(...),但我会失去 tail-然后递归,不是吗?

其实没有。

所以,这里的东西......你现在的尾递归,你填补了StreamALL与价值观。是的,Stream是懒惰的,但你计算所有的元素,剥离它的任何懒惰。

现在说你做newVal #:: rec(...)。你会失去尾递归吗?没有为什么?因为你没有递归。怎么来的?那么,Stream是懒惰的,所以它不会评估rec(...)

而这就是它的美丽。一旦你这样做,getFullSeqs在第一次交互时返回,并且只在printStrands要求时计算“递归”。不幸的是,这不会因为就是工作......

的问题是,你不断修改Stream - 这不是你如何使用Stream。随着Stream,你总是附加到它。不要保留“重写”Stream

现在,我还可以通过printStrands轻松识别出另外三个问题。首先,它在seqs上调用size,这将导致整个Stream被处理,失去了懒惰。千万不要致电sizeStream。其次,您拨applyseqse,通过索引访问它。不要致电applyStream(或List) - 这是非常低效的。这是O(n),这使得你的内部循环O(n^2) - 是的,在输入文件的标题数量二次方!最后,在整个printStrand的执行过程中,printStrands保留对seqs的引用,防止处理元素被垃圾收集。

所以,这里的第一个近似:

def inputStreams(fileName: String): (Stream[String], Stream[String]) = { 
    val in = (io.Source fromFile fileName).getLines.toStream 
    val SeqPatt = "^[^>]".r 
    def demultiplex(s: Stream[String], skip: Boolean): Stream[String] = { 
    if (s.isEmpty) Stream.empty 
    else if (skip) demultiplex(s.tail dropWhile (SeqPatt findFirstIn _ nonEmpty), skip = false) 
     else s.head #:: (s.tail takeWhile (SeqPatt findFirstIn _ nonEmpty)) #::: demultiplex(s.tail dropWhile (SeqPatt findFirstIn _ nonEmpty), skip = true) 
    } 
    (demultiplex(in, skip = false), demultiplex(in, skip = true)) 
} 

上述的问题,而我只显示在lazyness的问题,进一步引导代码,是即时你这样做:

val (a, b) = inputStreams(fileName) 

您将保留对两个流头的引用,这可以防止垃圾收集它们。你不能保留对它们的引用,所以你必须在获得它们后立即使用它们,而不要将它们存储在“val”或“lazy val”中。一个“var”可能会做,但处理起来会很棘手。所以让我们试试这个:

def inputStreams(fileName: String): Vector[Stream[String]] = { 
    val in = (io.Source fromFile fileName).getLines.toStream 
    val SeqPatt = "^[^>]".r 
    def demultiplex(s: Stream[String], skip: Boolean): Stream[String] = { 
    if (s.isEmpty) Stream.empty 
    else if (skip) demultiplex(s.tail dropWhile (SeqPatt findFirstIn _ nonEmpty), skip = false) 
     else s.head #:: (s.tail takeWhile (SeqPatt findFirstIn _ nonEmpty)) #::: demultiplex(s.tail dropWhile (SeqPatt findFirstIn _ nonEmpty), skip = true) 
    } 
    Vector(demultiplex(in, skip = false), demultiplex(in, skip = true)) 
} 

inputStreams(fileName).zipWithIndex.par.foreach { 
    case (stream, strand) => 
    val p = new PrintWriter(new File("FASTA" + "." + strand)) 
    stream foreach p.println 
    p.close 
} 

仍然无法正常工作,因为streaminputStreams作品作为参照,保持整个数据流在内存中打印即使它们。

因此,已经再次失败,我该怎么建议?把事情简单化。

def in = (scala.io.Source fromFile fileName).getLines.toStream 
def inputStream(in: Stream[String], strand: Int = 1): Stream[(String, Int)] = { 
    if (in.isEmpty) Stream.empty 
    else if (in.head startsWith ">") (in.head, 1 - strand) #:: inputStream(in.tail, 1 - strand) 
     else      (in.head, strand) #:: inputStream(in.tail, strand) 
} 
val printers = Array.tabulate(2)(i => new PrintWriter(new File("FASTA" + "." + i))) 
inputStream(in) foreach { 
    case (line, strand) => printers(strand) println line 
} 
printers foreach (_.close) 

现在,这将不会保留在内存中超过必要的。不过,我仍然认为这太复杂了。这可以更容易地完成这样的:

def in = (scala.io.Source fromFile fileName).getLines 
val printers = Array.tabulate(2)(i => new PrintWriter(new File("FASTA" + "." + i))) 
def printStrands(in: Iterator[String], strand: Int = 1) { 
    if (in.hasNext) { 
    val next = in.next 
    if (next startsWith ">") { 
     printers(1 - strand).println(next) 
     printStrands(in, 1 - strand) 
    } else { 
     printers(strand).println(next) 
     printStrands(in, strand) 
    } 
    } 
} 
printStrands(in) 
printers foreach (_.close) 

或者只是使用一个while循环而不是递归。

现在,到了其他问题:

B)这可能是有意义的这样做的同时阅读它,这样你就不必保持数据的副本:在MapSeq

C)请勿扭转Stream - 你将失去所有的懒惰。只是为了完整性

+0

太棒了!Thx在你花费在你的解释和代码示例上的时间太多了!+1。我承认我的例子是不是最好的,作为文件打印只是为了说明预处理步骤我真正感兴趣的后一个可能的任务只是为了澄清:这里面''stream' inputStream'do你的意思,并在那里做这个不同于下一个代码(我的意思是第一个和第二个不是“失败”)?而且,在你看来,这是第一个从头开始构建一个懒惰集合的方式(比如说我想实现我自己的'getLines'版本)吗? –

+0

@WayneJhukie是的,重新阅读我可以看到它不清楚。我的意思是由这一行声明的'stream':case(stream,strand)=>'。下一个代码的主要区别在于下一个代码没有嵌套的'foreach'。第一种情况有一个外部'foreach',它只是简单地选择两个输入流。第二种情况消除了多个数据流,因此您只有一个数据流可以迭代,这使得可以释放数据。请注意,在这个例子中'inputStream'是一个'def',所以没有指向流的'head'的指针。 –

+0

@WayneJhukie而且,是的,我认为第一个工作示例展示了构建懒惰集合的实用方法。请注意,就输入/输出而言,功能解决方案是迭代器。迭代是斯卡拉现在积极发展的领域:蓝眼睛,斯卡拉兹和Play都有它的一个版本。 –