2016-09-21 39 views
3

我想写一个ETL过程,合并之前的两个数据集我添加一列到每个数据集,新鲜的数据集得到2的,旧的数据集得到1的,然后如果行具有重复的主键,则删除旧/新列中具有1的行。我试着做在几个方面写这个,最近一个:线程#溢出排序数据_GB到磁盘

orderBy(keys, desc(old/new)).dropDuplicates(keys) 

但在大型数据集我总是有,说一个消息大规模放缓:

16/09/21 20:31:45 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (0 time so far) 
16/09/21 20:32:00 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (1 time so far) 
16/09/21 20:32:16 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (2 times so far) 
16/09/21 20:32:31 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (3 times so far) 
16/09/21 20:32:47 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (4 times so far) 
16/09/21 20:33:02 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (5 times so far) 
16/09/21 20:33:18 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (6 times so far) 
16/09/21 20:33:33 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (7 times so far) 
16/09/21 20:33:49 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (8 times so far) 
16/09/21 20:34:04 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (9 times so far) 
16/09/21 20:34:19 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (10 times so far) 
16/09/21 20:34:35 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (11 times so far) 
16/09/21 20:34:50 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (12 times so far) 
16/09/21 20:35:06 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (13 times so far) 
16/09/21 20:35:21 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (14 times so far) 
16/09/21 20:35:37 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (15 times so far) 
16/09/21 20:35:52 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (16 times so far) 
16/09/21 20:36:07 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (17 times so far) 
16/09/21 20:36:23 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (18 times so far) 
16/09/21 20:36:38 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (19 times so far) 
16/09/21 20:36:53 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (20 times so far) 
16/09/21 20:37:09 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (21 times so far) 
16/09/21 20:37:24 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (22 times so far) 
16/09/21 20:37:40 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (23 times so far) 
16/09/21 20:37:55 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (24 times so far) 
16/09/21 20:38:10 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (25 times so far) 
16/09/21 20:38:25 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (26 times so far) 
16/09/21 20:38:41 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (27 times so far) 
16/09/21 20:38:56 INFO UnsafeExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (28 times so far) 
16/09/21 20:39:25 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (0 time so far) 
16/09/21 20:39:45 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (1 time so far) 
16/09/21 20:40:05 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (2 times so far) 
16/09/21 20:40:26 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (3 times so far) 
16/09/21 20:40:46 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (4 times so far) 
16/09/21 20:41:07 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (5 times so far) 
16/09/21 20:41:27 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (6 times so far) 
16/09/21 20:41:47 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (7 times so far) 
16/09/21 20:42:07 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (8 times so far) 
16/09/21 20:42:28 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (9 times so far) 
16/09/21 20:42:49 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (10 times so far) 
16/09/21 20:43:09 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (11 times so far) 
16/09/21 20:43:30 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (12 times so far) 
16/09/21 20:43:50 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (13 times so far) 
16/09/21 20:44:11 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (14 times so far) 
16/09/21 20:44:31 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (15 times so far) 
16/09/21 20:44:52 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (16 times so far) 
16/09/21 20:45:13 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (17 times so far) 
16/09/21 20:45:33 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (18 times so far) 
16/09/21 20:45:53 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (19 times so far) 
16/09/21 20:46:14 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (20 times so far) 
16/09/21 20:46:34 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (21 times so far) 
16/09/21 20:46:54 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (22 times so far) 
16/09/21 20:47:14 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (23 times so far) 
16/09/21 20:47:34 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (24 times so far) 
16/09/21 20:47:54 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (25 times so far) 
16/09/21 20:48:14 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (26 times so far) 
16/09/21 20:48:34 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (27 times so far) 
16/09/21 20:48:54 INFO ShuffleExternalSorter: Thread 84 spilling sort data of 3.0 GB to disk (28 times so far) 

而且经检验Spark UI中只有一个加速工作的线程,其余的已经完成。
enter image description here
是否有可能在线程之间传播?

回答

1

您可以通过设计放大与数据倾斜相关的任何可能问题的方式解决此问题。由于您首先通过键和指示符变量对数据进行重新排序,您首先会对数据进行混洗,可能会创建高度不平衡的分区。在此之后施加任何减少将无法弥补这一点。

至少有两种方法可以用来实现相同的结果,同时充分受益于地图边的减少。我在答复中解释既SPARK DataFrame: select the first row of each group所以只是重申:

  • 您可以使用struct排序来选择每组最小/最大行。
  • 您可以使用静态类型DatasetgroupByKey,然后是reduceGroups
+0

如果我在主键上执行orderBy操作,我该如何创建一个歪斜? 我试过了你的Window.partitionBy方法,并且遇到了问题,但我会探索你提供的其他选择。 –

+0

窗口函数将做几乎相同的事情(首先重新分配),所以在这种情况下它不是你想要做的事情。 'orderBy'或多或少等同于'partitionBy',所以如果某些密钥特别常见,将导致分布不均匀。在这种情况下,你想要的是一个有效的地图侧组合。它可能无法解决问题,但至少应该提高整体性能。 – zero323

+0

没错,但在这种情况下,我的键是主键/复合键,所以它们都应该是唯一的,我仍然遇到问题。 –