谁能给出了如何在以下火花agrregate动作产生的(9,4)RDD.aggregate动作如何工作(即如何理解参数)?
val rdd = sc.parallelize(List(1,2,3,3))
rdd.aggregate((0, 0))
((x, y) =>
(x._1 + y, x._2 + 1),
(x, y) =>
(x._1 + y._1, x._2 + y._2))
res : (9,4)
谁能给出了如何在以下火花agrregate动作产生的(9,4)RDD.aggregate动作如何工作(即如何理解参数)?
val rdd = sc.parallelize(List(1,2,3,3))
rdd.aggregate((0, 0))
((x, y) =>
(x._1 + y, x._2 + 1),
(x, y) =>
(x._1 + y._1, x._2 + y._2))
res : (9,4)
这是星火2.1.0这里(应该没有多大关系,但是......)
转到the official documentation of aggregate
(又名scaladoc)和阅读:
使用给定的组合函数和中性的“零值”聚合每个分区的元素,然后聚合所有分区的结果。这个函数可以返回与RDD类型不同的结果类型U.因此,我们需要一个将T合并成U的操作,以及一个合并两个U的操作,就像在scala.TraversableOnce中一样。这两个函数都可以修改并返回它们的第一个参数,而不是创建一个新的U以避免内存分配。
签名是如下(除去隐式参数不特别有趣):
aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U): U
的scaladoc说:
零值为的累加结果的初始值seqOp运算符的每个分区以及combOp运算符的不同分区的组合结果的初始值 - 这通常是中性元素(例如,无列表拼接或0求和)
在你的情况,zeroValue
是(0, 0)
。
SEQOP用于分区
在你的情况中累积结果的运营商,seqOp
是(x, y) => (x._1 + y, x._2 + 1)
这是接受两对功能,遗憾的是名为x
和y
(我会打电话至少或者甚至使用模式匹配和部分功能,即case ((x1, y1), (x2, y2)) => ...
)。
鉴于你有n
分区(你可以检查出来使用rdd.getNumPartition
),seqOp
将被称为n
倍。
的scaladoc说:
combOp关联运营商使用的结果不同分区
这意味着combOp
将所有结果相结合的seqOp
和应用功能相结合:
(x, y) => (x._1 + y._1, x._2 + y._2)
它是ag ain写得很差,所以你看到太多,我甚至会拨打噪音。我会写出如下功能:
{ case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2) }
按照类型并给予适当的名称,最终一切都在斯卡拉变得更加容易;-)
结果基本上总说,一个详细的解释:我们希望有一个元组(A,B),其中一个是所有元素的总和和b是他们的数量。
这是通过初始化为(0,0)完成的,然后我们有两个功能:
第一个函数只是做加法,当我们在同一时间得到一个单一的元素,即元组是通过将值添加到第一个元素并向第二个元素添加1(count)来从单个元素更新。
第二功能合并两个结果,以便它只是做元件滴添加
让我们考虑输入数据的例子:
让说1,2是在分区1和3,图3是在隔壁3
分区1计算
分区1将STA rt与(0,0)。
然后第一个函数开始工作。
当我们添加一个我们得到(1,1)。第一个元素是总和(0 + y,其中y是1),第二个元素是计数(0 + 1)。
现在我们加2,所以我们得到(1 + 2,1 + 1)=(3,2)。第一个元素是我们迄今为止所看到的值的总和,第二个元素是它们的数量。
分区2计算
在第二分区我们再次用(0,0)开始,然后我们从第二得到(3,1)从所述第一3和(6,2)。
合并的结果
现在第二个功能进场合并两个:通过总结这两个元素得到(9,4)
我们合并(3,2)和(6,2)