2016-05-20 64 views
-1

我在独立模式下运行的火花与21个执行人,当我用我的sqlContext载入我的第一个SQL表,我的方式对其进行分区,以使数据在所有块中的完美分布几个执行人上的列划分是连续整数:星火歪斜数据

val brDF = sqlContext.load("jdbc", Map("url" -> srcurl, "dbtable" -> "basereading", "partitionColumn" -> "timeperiod", "lowerBound" ->"2", "upperBound" -> "35037", "numPartitions" -> "100")) 

此外,块被很好地分布在每个群集上,使得每个集群具有一个类似的内存使用情况。 不幸的是,当我更小的表IDOM像这样加入吧:

val mrDF = idoM.as('idom).join(brS1DF.as('br), $"idom.idoid" === $"br.meter") 

凡IDOM是1页的表和缓存结果是,RDD块存储上的集群变化的方式分配:

screenshot of spark UI executors sorted by number of RDD blocks

现在,有突然我的第四组更RDD块,并使用更多的内存。在检查每个RDD时,它们的块看起来仍然很好地分布,所以我的分区仍然很好,只是所有块似乎只想写在一个集群上,从而破坏了多个开始的目的。

我怀疑我的问题有类似 this question on the Apache mail list 的东西,但没有答案,所以任何将不胜感激。

回答

1

不知道你的数据,我认为要加入对密钥的分发是数据歪斜的原因。

运行idoM.groupBy("idoid").count.orderBy(desc("count")).showbrS1DF.groupBy("meter").count.orderBy(desc("count")).show可能会告诉你,有几个值有很多事件。

+1

我同意 - 您可以在连接后尝试“重新分配”以再次均匀分配数据。 –

0

问题同IDOM被装上一台机器,和火花试图保持数据局部性和做整体加入一台机器,这是在这种情况下,通过广播小表,较大的一个解决上。我确信idoM的键完全分布在正在连接的列上,不幸的是,重新分区并不能解决问题,因为spark仍然试图保持局部性,并且整个dataFrame仍然会在一台机器上结束。