2015-05-23 18 views
3

我加入了两个RDD rddArddB锄头Spark是否计划加入?

rddA有100个分区,rddB有500个分区。

我想了解join操作的机制。默认情况下,无论连接的顺序如何,我都会得到相同的分区结构;即rddA.join(rddB)和rddB.join(rddA)产生相同数量的分区,并且通过观察它使用较小的分区大小,100.我知道我可以通过使用rddA.join(rddB,500)来增加分区大小,但是我更关心在引擎盖下发生了什么以及为什么选择较小的尺寸。从观察结果来看,即使我重新划分小型rdd,它的分区仍然会被使用; Spark是否对密钥大小做过任何启发式分析?

我的另一个问题是我得到的偏斜程度。我的小分区最终以3,314个条目结束,而最大分区总数达到599,911,729个(键),总数为1,139,207个。两个RDD都使用默认分区程序,那么数据混洗是如何决定的? 我依稀记得,如果一个rdd有一个分区程序集,那么它将被使用的分区程序。是这样吗?这是“推荐”吗?

最后,请注意,我的两个rdd s都比较大(〜90GB),因此广播连接无济于事。相反,任何对join操作提供一些见解的方式都可能是要走的路。

PS。关于机制左右连接的任何细节都将是额外的好处:)

回答

5

虽然我还没有设法解释如何派生分区,但我确实发现了数据如何洗牌(这是我最初的问题)。一个连接有一些副作用:

洗牌/分区: 星火将散列分区“RDD”键,移动/“工人”之间分配。给定键(例如5)的每组值都将以单个'工作者'/ JVM结束。这意味着如果你的'join'有一个1..N的关系,并且N严重倾斜,你将会得到偏斜的分区和JVM堆(即一个'Partition'可能有Max(N),另一个Min(N) )。避免这种情况的唯一方法是尽可能使用'广播'或忍受这种行为。由于您的数据最初将平均分配,混洗的数量将取决于密钥散列。

重新分区: 继“倾斜”的加入,呼吁“重新划分”似乎分区之间均匀地重新分配数据。所以如果你有不可避免的偏斜问题,这是一件好事。请注意,这种转换会引发沉重的洗牌,但下面的操作会更快。这样做的缺点,虽然是不可控制的对象创建(见下文)

对象创建/堆污染: 你成功加入您的数据认为,重新划分将是一个好主意,重新平衡群集,但对于一些理由,'重新分配'触发'OOME'。会发生什么是最初连接的数据重新使用连接的对象。当你触发“重新分配”或任何其他涉及洗牌的“行动”时,一个额外的连接或'groupBy'(后跟一个'Action'),数据被序列化,所以你失去了对象的重用。一旦对象被反序列化,它们就成为新的实例。另外请注意,在序列化过程中,重复使用会丢失,因此压力会很大。所以,就我而言,1 ..1000000加入(其中1是我'沉重'的对象),将在任何触发洗牌的操作之后失败。

变通/调试:

  1. 我用“mapPartitionsWithIndex”调试分区大小通过返回单个项目“可迭代>”与每个分区的计数。这是非常有用的,因为您可以在“操作”之后看到“重新分区”的效果和分区的状态。
  2. 您可以在连接RDD上使用'countByKeyApprox'或'countByKey'来获得基数的感觉,然后分两步应用连接。为您的高基数密钥使用“广播”,为低基数密钥使用“加入”。将这些操作包装在'rdd.cache()'&'rdd.unpersist()'块中将显着加速此过程。虽然这可能会使代码变得复杂一些,但它会提供更好的性能,尤其是如果您执行后续操作时。另外请注意,如果您在每个“地图”中使用“广播”,要进行查找,您还将显着减少混排大小。
  3. 调用影响分区数量的其他操作的“重新分区”可能非常有用,但请注意,随机分配的大量数据将导致更多的异常,因为给定密钥的大集合将创建大分区,但其他分区的大小将为0.创建调试方法以获取分区大小将有助于您选择合适的大小。