2015-09-02 49 views
0

所以这个标题应该让人困惑,所以我会尽我所能解释。我试图将此功能分解为定义的函数,以便更好地了解aggregateByKey如何为将写入我的代码的其他团队工作。我有以下合计:Spark aggregateByKey使用Map并为函数定义数据类型

val firstLetter = stringRDD.aggregateByKey(Map[Char, Int]())(
     (accumCount, value) => accumCount.get(value.head) match { 
     case None => accumCount + (value.head -> 1) 
     case Some(count) => accumCount + (value.head -> (count + 1)) 
     }, 
     (accum1, accum2) => accum1 ++ accum2.map{case(k,v) => k -> (v + accum1.getOrElse(k, 0))} 
    ).collect() 

我一直想打破这件事,如下所示:

val firstLet = Map[Char, Int] 
    def fSeq(accumCount:?, value:?) = { 
    accumCount.get(value.head) match { 
     case None => accumCount + (value.head -> 1) 
     case Some(count) => accumCount + (value.head -> (count + 1)) 
    } 
    } 
    def fComb(accum1:?, accum2:?) = { 
    accum1 ++ accum2.map{case(k,v) => k -> (v + accum1.getOrElse(k, 0)) 
    } 

由于初始值是一个地图[字符,INT]我不知道是什么使accumCount,Value数据类型定义。我尝试了不同的东西,但没有看到任何工作。有人可以帮我定义数据类型并解释你如何确定它?

+0

而这里的输入是什么? 'RDD [(T,String)]'? – zero323

回答

1
  • seqOp采用相同类型作为初始值作为第一个参数,以及相同类型的值作为值的累加器在RDD
  • combOp取两个相同类型的累加器的初始值。

假设你想要聚合RDD[(T,U)]

def fSeq(accumCount: Map[Char, Int], value: U): Map[Char, Int] = ??? 
def fComb(accum1: Map[Char, Int], accum2: Map[Char, Int]): Map[Char, Int] = ??? 

我猜你的情况U简直是为String,所以你应该调整fSeq签名。

BTW,您可以使用提供默认映射和简化功能:

val firstLet = Map[Char, Int]().withDefault(x => 0) 

def fSeq(accumCount: Map[Char, Int], value: String): Map[Char, Int] = { 
    accumCount + (value.head -> (accumCount(value.head) + 1)) 
} 

def fComb(accum1: Map[Char, Int], accum2: Map[Char, Int]): Map[Char, Int] = { 
    val accum = (accum1.keys ++ accum2.keys).map(k => (k, accum1(k) + accum2(k))) 
    accum.toMap.withDefault(x => 0) 
} 

最后,它可以更有效地使用scala.collection.mutable.Map

import scala.collection.mutable.{Map => MMap} 

def firstLetM = MMap[Char, Int]().withDefault(x => 0) 

def fSeqM(accumCount: MMap[Char, Int], value: String): MMap[Char, Int] = { 
    accumCount += (value.head -> (accumCount(value.head) + 1)) 
} 

def fCombM(accum1: MMap[Char, Int], accum2: MMap[Char, Int]): MMap[Char, Int] = { 
    accum2.foreach{case (k, v) => accum1 += (k -> (accum1(k) + v))} 
    accum1 
} 

测试:

def randomChar() = (scala.util.Random.nextInt.abs % 58 + 65).toChar 
def randomString() = { 
    (Seq(randomChar) ++ Iterator.iterate(randomChar)(_ => randomChar) 
     .takeWhile(_ => scala.util.Random.nextFloat > 0.1)).mkString 
} 

val stringRdd = sc.parallelize(
    (1 to 500000).map(_ => (scala.util.Random.nextInt.abs % 60, randomString))) 


val firstLetter = stringRDD.aggregateByKey(Map[Char, Int]())(
    (accumCount, value) => accumCount.get(value.head) match { 
    case None => accumCount + (value.head -> 1) 
    case Some(count) => accumCount + (value.head -> (count + 1)) 
    }, 
    (accum1, accum2) => accum1 ++ accum2.map{ 
    case(k,v) => k -> (v + accum1.getOrElse(k, 0))} 
).collectAsMap() 

val firstLetter2 = stringRDD 
    .aggregateByKey(firstLet)(fSeq, fComb) 
    .collectAsMap 

val firstLetter3 = stringRDD 
    .aggregateByKey(firstLetM)(fSeqM, fCombM) 
    .mapValues(_.toMap) 
    .collectAsMap 


firstLetter == val firstLetter2 
firstLetter == val firstLetter3 
+0

感谢您的补充更新,使代码更易于阅读和使用。 – theMadKing

+0

顺便说一句,这段代码无法正常工作,旧的代码计算了所有的键和coutns,无论出于何种原因,这段代码都停在50次计数。我的旧代码不这样做! – theMadKing

+0

你能提供示例输入吗? – zero323

相关问题