seqOp
采用相同类型作为初始值作为第一个参数,以及相同类型的值作为值的累加器在RDD
。
combOp
取两个相同类型的累加器的初始值。
假设你想要聚合RDD[(T,U)]
:
def fSeq(accumCount: Map[Char, Int], value: U): Map[Char, Int] = ???
def fComb(accum1: Map[Char, Int], accum2: Map[Char, Int]): Map[Char, Int] = ???
我猜你的情况U
简直是为String
,所以你应该调整fSeq
签名。
BTW,您可以使用提供默认映射和简化功能:
val firstLet = Map[Char, Int]().withDefault(x => 0)
def fSeq(accumCount: Map[Char, Int], value: String): Map[Char, Int] = {
accumCount + (value.head -> (accumCount(value.head) + 1))
}
def fComb(accum1: Map[Char, Int], accum2: Map[Char, Int]): Map[Char, Int] = {
val accum = (accum1.keys ++ accum2.keys).map(k => (k, accum1(k) + accum2(k)))
accum.toMap.withDefault(x => 0)
}
最后,它可以更有效地使用scala.collection.mutable.Map
:
import scala.collection.mutable.{Map => MMap}
def firstLetM = MMap[Char, Int]().withDefault(x => 0)
def fSeqM(accumCount: MMap[Char, Int], value: String): MMap[Char, Int] = {
accumCount += (value.head -> (accumCount(value.head) + 1))
}
def fCombM(accum1: MMap[Char, Int], accum2: MMap[Char, Int]): MMap[Char, Int] = {
accum2.foreach{case (k, v) => accum1 += (k -> (accum1(k) + v))}
accum1
}
测试:
def randomChar() = (scala.util.Random.nextInt.abs % 58 + 65).toChar
def randomString() = {
(Seq(randomChar) ++ Iterator.iterate(randomChar)(_ => randomChar)
.takeWhile(_ => scala.util.Random.nextFloat > 0.1)).mkString
}
val stringRdd = sc.parallelize(
(1 to 500000).map(_ => (scala.util.Random.nextInt.abs % 60, randomString)))
val firstLetter = stringRDD.aggregateByKey(Map[Char, Int]())(
(accumCount, value) => accumCount.get(value.head) match {
case None => accumCount + (value.head -> 1)
case Some(count) => accumCount + (value.head -> (count + 1))
},
(accum1, accum2) => accum1 ++ accum2.map{
case(k,v) => k -> (v + accum1.getOrElse(k, 0))}
).collectAsMap()
val firstLetter2 = stringRDD
.aggregateByKey(firstLet)(fSeq, fComb)
.collectAsMap
val firstLetter3 = stringRDD
.aggregateByKey(firstLetM)(fSeqM, fCombM)
.mapValues(_.toMap)
.collectAsMap
firstLetter == val firstLetter2
firstLetter == val firstLetter3
而这里的输入是什么? 'RDD [(T,String)]'? – zero323