2013-08-20 46 views
2

我的问题与Count occurrences of each element in a List[List[T]] in Scala非常相似,只是我想要一个涉及parallel collections的高效解决方案。Scala并行集合中每个项目的发生次数

具体而言,我有个整数的一个大的(〜10^7)矢量的短vec(〜10)列表和我想获得每个诠释xx发生时,例如作为Map[Int,Int]的次数。不同整数的数量是10^6的数量级。由于需要完成的机器具有相当数量的内存(150GB)和内核数量(> 100),因此平行集合似乎是一个不错的选择。下面的代码是一个好方法吗?

val flatpvec = vec.par.flatten 
val flatvec = flatpvec.seq 
val unique = flatpvec.distinct 
val counts = unique map (x => (x -> flatvec.count(_ == x))) 
counts.toMap 

还是有更好的解决方案?如果你想知道的.SEQ转化:由于某种原因,下面的代码似乎并没有终止,即使是小例子:

val flatpvec = vec.par.flatten 
val unique = flatpvec.distinct 
val counts = unique map (x => (x -> flatpvec.count(_ == x))) 
counts.toMap 

回答

3

此做一些事情。 aggregate就像fold除了你也结合顺序折叠的结果。

更新:在.par.groupBy中有开销并不奇怪,但我对常数因素感到惊讶。通过这些数字,你永远不会那样计数。另外,我不得不通过记忆的方式。

有趣的技术,用于构建结果图is described in this paper链接从the overview。 (它聪明地保存了中间结果,然后在最后并行合并它们)。

但是,如果你真的想要的只是一个计数,那么复制groupBy的中间结果会很昂贵。

这些数字比较顺序groupBy,并行,最后是aggregate

[email protected]:~/tmp$ scalacm countints.scala ; scalam -J-Xms8g -J-Xmx8g -J-Xss1m countints.Test 
GroupBy: Starting... 
Finished in 12695 
GroupBy: List((233,10078), (237,20041), (268,9939), (279,9958), (315,10141), (387,9917), (462,9937), (680,9932), (848,10139), (858,10000)) 
Par GroupBy: Starting... 
Finished in 51481 
Par GroupBy: List((233,10078), (237,20041), (268,9939), (279,9958), (315,10141), (387,9917), (462,9937), (680,9932), (848,10139), (858,10000)) 
Aggregate: Starting... 
Finished in 2672 
Aggregate: List((233,10078), (237,20041), (268,9939), (279,9958), (315,10141), (387,9917), (462,9937), (680,9932), (848,10139), (858,10000)) 

测试代码中没有什么神奇的东西。

import collection.GenTraversableOnce 
import collection.concurrent.TrieMap 
import collection.mutable 

import concurrent.duration._ 

trait Timed { 
    def now = System.nanoTime 
    def timed[A](op: =>A): A = { 
    val start = now 
    val res = op 
    val end = now 
    val lapsed = (end - start).nanos.toMillis 
    Console println s"Finished in $lapsed" 
    res 
    } 
    def showtime(title: String, op: =>GenTraversableOnce[(Int,Int)]): Unit = { 
    Console println s"$title: Starting..." 
    val res = timed(op) 
    //val showable = res.toIterator.min //(res.toIterator take 10).toList 
    val showable = res.toList.sorted take 10 
    Console println s"$title: $showable" 
    } 
} 

它生成一些感兴趣的随机数据。

object Test extends App with Timed { 

    val upto = math.pow(10,6).toInt 
    val ran = new java.util.Random 
    val ten = (1 to 10).toList 
    val maxSamples = 1000 
    // samples of ten random numbers in the desired range 
    val samples = (1 to maxSamples).toList map (_ => ten map (_ => ran nextInt upto)) 
    // pick a sample at random 
    def anyten = samples(ran nextInt maxSamples) 
    def mag = 7 
    val data: Vector[List[Int]] = Vector.fill(math.pow(10,mag).toInt)(anyten) 

的顺序操作和aggregate组合操作是从任务调用,并将结果指定给易失性变种。

def z: mutable.Map[Int,Int] = mutable.Map.empty[Int,Int] 
    def so(m: mutable.Map[Int,Int], is: List[Int]) = { 
    for (i <- is) { 
     val v = m.getOrElse(i, 0) 
     m(i) = v + 1 
    } 
    m 
    } 
    def co(m: mutable.Map[Int,Int], n: mutable.Map[Int,Int]) = { 
    for ((i, count) <- n) { 
     val v = m.getOrElse(i, 0) 
     m(i) = v + count 
    } 
    m 
    } 
    showtime("GroupBy", data.flatten groupBy identity map { case (k, vs) => (k, vs.size) }) 
    showtime("Par GroupBy", data.flatten.par groupBy identity map { case (k, vs) => (k, vs.size) }) 
    showtime("Aggregate", data.par.aggregate(z)(so, co)) 
} 
+0

有趣,但是不会导致为'data'中的每个元素创建一个Map吗? – mitchus

+0

@mitchus z更有意义,因为z是可变的,所以每个顺序操作一个映射,这是一个单线程任务,但我懒得解决它。我会把它放在我的身上。 –

+0

@mitchus更新为使用可变结果,这只是起作用。看到令人惊讶的数字。或者,也许他们并不奇怪。 –

2

如果你想使用并行收集和Scala的标准工具,你可以这样做。集团您的收藏由身份,然后将其映射到(价值,计数):

scala> val longList = List(1, 5, 2, 3, 7, 4, 2, 3, 7, 3, 2, 1, 7) 
longList: List[Int] = List(1, 5, 2, 3, 7, 4, 2, 3, 7, 3, 2, 1, 7)                        

scala> longList.par.groupBy(x => x) 
res0: scala.collection.parallel.immutable.ParMap[Int,scala.collection.parallel.immutable.ParSeq[Int]] = ParMap(5 -> ParVector(5), 1 -> ParVector(1, 1), 2 -> ParVector(2, 2, 2), 7 -> ParVector(7, 7, 7), 3 -> ParVector(3, 3, 3), 4 -> ParVector(4))                  

scala> longList.par.groupBy(x => x).map(x => (x._1, x._2.size)) 
res1: scala.collection.parallel.immutable.ParMap[Int,Int] = ParMap(5 -> 1, 1 -> 2, 2 -> 3, 7 -> 3, 3 -> 3, 4 -> 1)           

甚至像pagoda_5b更好的意见建议:

scala> longList.par.groupBy(identity).mapValues(_.size) 
res1: scala.collection.parallel.ParMap[Int,Int] = ParMap(5 -> 1, 1 -> 2, 2 -> 3, 7 -> 3, 3 -> 3, 4 -> 1) 
+0

好主意,我会试试。 – mitchus

+2

您可以使用'identity(_)'函数作为'groupBy'的参数而不是'x => x'的一些小改进。你也可以用'mapValues(_“)来映射分组'Map'的值。大小)' –

+0

伟大的建议,pagoda_5b。我将它添加到答案中。 :) –

相关问题