我是火花新手,使用有一个简单的火花申请星火SQL/hiveContext到:如何优化火花SQL运行它并行
- 从蜂巢表选择数据(1个十亿行)
- 做一些过滤,聚合包括ROW_NUMBER在窗函数通过选择第一行,组数()和max()等
- 结果写入到HBase的(几百万行)
我提交工作来运行它在纱线集群(100个执行器)上,速度很慢,当我查看Spark UI中的DAG可视化时,似乎只有蜂巢表扫描任务并行运行,其余步骤#2和#3仅运行在一个哪些实例可能应该能够优化并行化?
该应用程序看起来像:
步骤1:
val input = hiveContext
.sql(
SELECT
user_id
, address
, age
, phone_number
, first_name
, last_name
, server_ts
FROM
(
SELECT
user_id
, address
, age
, phone_number
, first_name
, last_name
, server_ts
, row_number() over
(partition by user_id, address, phone_number, first_name, last_name order by user_id, address, phone_number, first_name, last_name, server_ts desc, age) AS rn
FROM
(
SELECT
user_id
, address
, age
, phone_number
, first_name
, last_name
, server_ts
FROM
table
WHERE
phone_number <> '911' AND
server_date >= '2015-12-01' and server_date < '2016-01-01' AND
user_id IS NOT NULL AND
first_name IS NOT NULL AND
last_name IS NOT NULL AND
address IS NOT NULL AND
phone_number IS NOT NULL AND
) all_rows
) all_rows_with_row_number
WHERE rn = 1)
val input_tbl = input.registerTempTable(input_tbl)
步骤2:
val result = hiveContext.sql(
SELECT state,
phone_number,
address,
COUNT(*) as hash_count,
MAX(server_ts) as latest_ts
FROM
(SELECT
udf_getState(address) as state
, user_id
, address
, age
, phone_number
, first_name
, last_name
, server_ts
FROM
input_tbl) input
WHERE state IS NOT NULL AND state != ''
GROUP BY state, phone_number, address)
步骤3:
result.cache()
result.map(x => ...).saveAsNewAPIHadoopDataset(conf)
正如您所看到的,阶段0中的“过滤器”,“项目”和“交换”只在一个实例中运行,stage1和stage2也是如此,所以如果问题是几个问题和道歉哑:
- 在每个执行器进行数据混洗之后,Driver中是否会出现“Filter”,“Project”和“Exchange”?
- 什么代码映射到“过滤器”,“项目”和“交换”?
- 我怎么可以同时运行“过滤器”,“项目”和“交换”来优化性能?
- 可以同时运行stage1和stage2吗?
您是否检查过hbase连接器允许下推谓词?如果是这样,而不是从HBase提取所有数据,那么可以让HBase帮助您至少过滤一些数据。主要的瓶颈通常是I/O和网络。你的代码思想中有些东西不清楚。你的桌子代表什么?它是使用HBase的数据创建的DataFrame吗?你的输入数据怎么样?恐怕描述有点宽泛。你愿意重温你的问题吗? – eliasah
@eliasah,感谢您的评论。数据从Hive中提取并存储到Hbase中。同意瓶颈是I/O和网络,特别是有很多混洗 - 2TB输入数据和40GB混洗写入。我了解到,洗牌越少越好,但洗牌也必然与输入数据的大小有关。如果是这样,我想知道什么样的比例(洗牌/输入)会是一个很好的比例? –