3

我是火花新手,使用有一个简单的火花申请星火SQL/hiveContext到:如何优化火花SQL运行它并行

  1. 从蜂巢表选择数据(1个十亿行)
  2. 做一些过滤,聚合包括ROW_NUMBER在窗函数通过选择第一行,组数()和max()等
  3. 结果写入到HBase的(几百万行)

我提交工作来运行它在纱线集群(100个执行器)上,速度很慢,当我查看Spark UI中的DAG可视化时,似乎只有蜂巢表扫描任务并行运行,其余步骤#2和#3仅运行在一个哪些实例可能应该能够优化并行化?

该应用程序看起来像:

步骤1:

val input = hiveContext 
    .sql(
    SELECT 
      user_id 
      , address 
      , age 
      , phone_number 
      , first_name 
      , last_name 
      , server_ts 
     FROM 
     (  
      SELECT 
       user_id 
       , address 
       , age 
       , phone_number 
       , first_name 
       , last_name 
       , server_ts 
       , row_number() over 
       (partition by user_id, address, phone_number, first_name, last_name order by user_id, address, phone_number, first_name, last_name, server_ts desc, age) AS rn 
      FROM 
      ( 
       SELECT 
        user_id 
        , address 
        , age 
        , phone_number 
        , first_name 
        , last_name 
        , server_ts 
       FROM 
        table 
       WHERE 
        phone_number <> '911' AND 
        server_date >= '2015-12-01' and server_date < '2016-01-01' AND 
        user_id IS NOT NULL AND 
        first_name IS NOT NULL AND 
        last_name IS NOT NULL AND 
        address IS NOT NULL AND 
        phone_number IS NOT NULL AND 
      ) all_rows 
     ) all_rows_with_row_number 
     WHERE rn = 1) 

val input_tbl = input.registerTempTable(input_tbl) 

步骤2:

val result = hiveContext.sql(
    SELECT state, 
     phone_number, 
     address, 
     COUNT(*) as hash_count, 
     MAX(server_ts) as latest_ts 
    FROM 
    (SELECT 
     udf_getState(address) as state 
     , user_id 
     , address 
     , age 
     , phone_number 
     , first_name 
     , last_name 
     , server_ts 
    FROM 
     input_tbl) input 
    WHERE state IS NOT NULL AND state != '' 
    GROUP BY state, phone_number, address) 

步骤3:

result.cache() 
result.map(x => ...).saveAsNewAPIHadoopDataset(conf) 

的DAG可视化看起来像: enter image description here

正如您所看到的,阶段0中的“过滤器”,“项目”和“交换”只在一个实例中运行,stage1和stage2也是如此,所以如果问题是几个问题和道歉哑:

  1. 在每个执行器进行数据混洗之后,Driver中是否会出现“Filter”,“Project”和“Exchange”?
  2. 什么代码映射到“过滤器”,“项目”和“交换”?
  3. 我怎么可以同时运行“过滤器”,“项目”和“交换”来优化性能?
  4. 可以同时运行stage1和stage2吗?
+0

您是否检查过hbase连接器允许下推谓词?如果是这样,而不是从HBase提取所有数据,那么可以让HBase帮助您至少过滤一些数据。主要的瓶颈通常是I/O和网络。你的代码思想中有些东西不清楚。你的桌子代表什么?它是使用HBase的数据创建的DataFrame吗?你的输入数据怎么样?恐怕描述有点宽泛。你愿意重温你的问题吗? – eliasah

+0

@eliasah,感谢您的评论。数据从Hive中提取并存储到Hbase中。同意瓶颈是I/O和网络,特别是有很多混洗 - 2TB输入数据和40GB混洗写入。我了解到,洗牌越少越好,但洗牌也必然与输入数据的大小有关。如果是这样,我想知道什么样的比例(洗牌/输入)会是一个很好的比例? –

回答

3

你不能正确读取DAG图 - 每个步骤是使用单箱并不意味着它不使用多个任务(因此核心)来计算可视化的事实步。

通过钻入舞台视图中可以看到每个步骤使用了多少个任务,该视图显示该阶段的所有任务。

例如,这里有一个样本DAG可视化与你相似:

enter image description here

你可以看到每个阶段由步骤“单”栏所示。

但是,如果我们看看下面的表格中,我们可以看到任务的每级数量:

enter image description here

其中之一是只使用2个任务,但其他使用220,这意味着数据被分割成220个分区,并且分区被并行处理,给定足够的可用资源。

如果深入了解该阶段,则可以再次看到它已使用了220个任务和所有任务的详细信息。从磁盘

enter image description here

只有任务读取数据显示在图中具有这些“多点”,以帮助您了解多少文件是如何被读取。

SO--正如Rashid的回答所暗示的,检查每个阶段的任务数量。

+0

只是为了增加上面的内容;​​最好是没有更少的阶段,因为这会表明更少的阶段。 “广泛的依赖性”,从而减少整个集群的数据移动。 – sourabh

+0

谢谢@Tzach Zohar。这里学到的绝对新东西!是否还有更多相关文章,我可以阅读/学习的幻灯片?您的意见已经非常丰富,只是想了解更多信息:) –

+0

谢谢@sourabh。在将数据写入HBase之前,我还尝试使用result.cache()和result.repartition(),以查看它们是否对性能优化有用。我没有看到缓存()反映在DAG图中,但重新分区在阶段2中显示在DAG中作为一个框。由于这是一个额外的步骤来执行,我想知道如何将有利于整体性能? –

1

这不是很明显,所以我会做以下事情以解决问题。

  1. 计算每个步骤的执行时间。
  2. 如果您的表格是文本格式,则第一步可能会很慢,如果数据以实木复合地格式存储在Hive中,那么spark通常会更好。
  3. 查看您的表是否由where子句中使用的列分区。
  4. 如果将数据保存到Hbase的速度很慢,那么您可能需要预先拆分hbase表,因为默认情况下数据存储在单个区域中。
  5. 看看阶段选项卡中的火花UI,看看有多少任务都开始为每一个阶段,也找数据地方一级描述here

希望,你将能够零的问题。

+0

谢谢@Rashid Ali。 1.每个步骤的执行时间在spark UI中显示。据推测,它是准确的提到:) –

+0

2.它的兽人文件 3.它是由几个在where子句中使用的列分区,而不是所有:( 4.我已经预先将数据按字符,例如{SPLITS => ['a','b','c'...]}作为行键是字符的组合,不确定我是否以这种方式做对了 5.有超过20K的数据读取任务配置单元,其他阶段的任务编号是2001年和64年,大概所有的任务都是并行运行:)我想知道如何让它更平行。 –

+0

哪一步需要更长的时间,需要进行一些优化,它是读取配置单元还是将数据存储到hbase中。关于hbase拆分,如果table没有正确拆分,那么数据可以写入一个或多个可能成为瓶颈的区域。分割是根据键分配数据的关键范围。记住大量的任务总是不好的迹象。例如,如果您的数据是分区的,并且查询正在从一个或两个分区读取数据,而不是大量的任务,则意味着将读取完整的表,这不是预期的。我希望这个能帮上忙。 –