此做一些事情。 aggregate
就像fold
除了你也结合顺序折叠的结果。
更新:在.par.groupBy
中有开销并不奇怪,但我对常数因素感到惊讶。通过这些数字,你永远不会那样计数。另外,我不得不通过记忆的方式。
有趣的技术,用于构建结果图is described in this paper链接从the overview。 (它聪明地保存了中间结果,然后在最后并行合并它们)。
但是,如果你真的想要的只是一个计数,那么复制groupBy
的中间结果会很昂贵。
这些数字比较顺序groupBy
,并行,最后是aggregate
。
[email protected]:~/tmp$ scalacm countints.scala ; scalam -J-Xms8g -J-Xmx8g -J-Xss1m countints.Test
GroupBy: Starting...
Finished in 12695
GroupBy: List((233,10078), (237,20041), (268,9939), (279,9958), (315,10141), (387,9917), (462,9937), (680,9932), (848,10139), (858,10000))
Par GroupBy: Starting...
Finished in 51481
Par GroupBy: List((233,10078), (237,20041), (268,9939), (279,9958), (315,10141), (387,9917), (462,9937), (680,9932), (848,10139), (858,10000))
Aggregate: Starting...
Finished in 2672
Aggregate: List((233,10078), (237,20041), (268,9939), (279,9958), (315,10141), (387,9917), (462,9937), (680,9932), (848,10139), (858,10000))
测试代码中没有什么神奇的东西。
import collection.GenTraversableOnce
import collection.concurrent.TrieMap
import collection.mutable
import concurrent.duration._
trait Timed {
def now = System.nanoTime
def timed[A](op: =>A): A = {
val start = now
val res = op
val end = now
val lapsed = (end - start).nanos.toMillis
Console println s"Finished in $lapsed"
res
}
def showtime(title: String, op: =>GenTraversableOnce[(Int,Int)]): Unit = {
Console println s"$title: Starting..."
val res = timed(op)
//val showable = res.toIterator.min //(res.toIterator take 10).toList
val showable = res.toList.sorted take 10
Console println s"$title: $showable"
}
}
它生成一些感兴趣的随机数据。
object Test extends App with Timed {
val upto = math.pow(10,6).toInt
val ran = new java.util.Random
val ten = (1 to 10).toList
val maxSamples = 1000
// samples of ten random numbers in the desired range
val samples = (1 to maxSamples).toList map (_ => ten map (_ => ran nextInt upto))
// pick a sample at random
def anyten = samples(ran nextInt maxSamples)
def mag = 7
val data: Vector[List[Int]] = Vector.fill(math.pow(10,mag).toInt)(anyten)
的顺序操作和aggregate
组合操作是从任务调用,并将结果指定给易失性变种。
def z: mutable.Map[Int,Int] = mutable.Map.empty[Int,Int]
def so(m: mutable.Map[Int,Int], is: List[Int]) = {
for (i <- is) {
val v = m.getOrElse(i, 0)
m(i) = v + 1
}
m
}
def co(m: mutable.Map[Int,Int], n: mutable.Map[Int,Int]) = {
for ((i, count) <- n) {
val v = m.getOrElse(i, 0)
m(i) = v + count
}
m
}
showtime("GroupBy", data.flatten groupBy identity map { case (k, vs) => (k, vs.size) })
showtime("Par GroupBy", data.flatten.par groupBy identity map { case (k, vs) => (k, vs.size) })
showtime("Aggregate", data.par.aggregate(z)(so, co))
}
有趣,但是不会导致为'data'中的每个元素创建一个Map吗? – mitchus
@mitchus z更有意义,因为z是可变的,所以每个顺序操作一个映射,这是一个单线程任务,但我懒得解决它。我会把它放在我的身上。 –
@mitchus更新为使用可变结果,这只是起作用。看到令人惊讶的数字。或者,也许他们并不奇怪。 –