2016-05-10 41 views
0

下面这段代码是应该找到每密钥的平均使用combineByKey():CombineBy重点星火方法

val result = input.combineByKey(
(v) => (v, 1), 
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), 
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)). 
map{ case (key, value) => (key, value._1/value._2.toFloat) } 
result.collectAsMap().map(println(_)) 

我上面的方法的执行混乱。假设我们有数据集
((1,1), (1,3), (2,4), (2,3), (3,1))

所以combineByKey的执行将是这个样子?:

1)首先,它会创建一个累加器(1,1)
2)然后,当它遇到一个具有相同键(1)的元组时,它会将键值添加到一起?因此,当遇到(1,3)时,密钥1的新累加器将看起来像(2,2)。由于它添加了(1,1) and (1,3)的密钥,并且由于有两个元组和密钥1,所以它将在(2,2)中放置一个2(在右侧)。
3)然后它将继续为所有相同的密钥执行此操作。
4)然后最后它将从每个分区获取所有累加器,并将键(元组左侧)和发生次数(元组右侧)添加到一个元组中为每个键。

对不起,如果这是有点关闭,我仍然习惯于函数式编程方法!

回答

1

通常情况下,通过查看方法的类型和包含类可以得到很多清晰的结果。

PairRDDFunctions[K, V]

def combineByKey[C]( createCombiner: (V) ⇒ C, mergeValue: (C, V) ⇒ C, mergeCombiners: (C, C) ⇒ C): RDD[(K, C)]

我们与2型参数,可以一键和值,并与一个多,一个组合的方法的类。

系统会要求您提供功能

  • 把一个值转换成合
  • 把一个价值以及合并成一个组合
  • 转合和组合成一个组合

立即,这使得你的描述,加上密钥,因为我们还没有提供任何方法操作键。

对于每个键:

  1. 首先它将从一个值通过将值插入到tuple2的第一时隙以1在第二时隙(1, 1)创建组合器,在这种情况下。
  2. 然后它将通过将值添加到tuple2的第一个槽并递增第二个槽来将同一个关键字的每个附加值合并到组合器中。 (1 + 3,1 + 1)==(4,2)
  3. 然后,它将继续为同一个键的所有条目执行此操作。
  4. 然后最后它将从每个分区中获取所有累加器,并将值(元组左侧)和发生次数(元组右侧)添加到一个元组中每个键。

您的困惑可能源于您的密钥和值属于同一类型。如果您将密钥更改为Strings,则代码会进行编译,但如果您使用值执行此操作,则不会。

+0

哦,你指出了什么让我感到困惑,事实上,关键和价值都是数字!非常感谢您的详细解释,现在有道理! – LP496

+0

也只是为了澄清它在最后一次“从每个分区获取所有累加器并将值(元组左侧)和发生次数(元组右侧)添加到每个键一个元组“。它知道每个键值对来自不同分区的哪个键。因此,让我们从分区1和2分别获得关键字1的(5,4)和(3,2)。和(3,4)和(4,5)分别为关键2。所以spark会知道将key 1元组和key 2元组合在一起? – LP496

+1

是的,你的结构基本上是'(键:Int,(总和:Int,count; Int))' –