2017-06-07 104 views
0

我是一个新手,需要一些帮助来调试非常慢的火花性能。 我正在做转换,并且已经运行了2个多小时。火花性能很慢

scala> val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 
hiveContext: org.apache.spark.sql.hive.HiveContext =  [email protected] 
scala> val t1_df = hiveContext.sql("select * from T1") 

scala> t1_df.registerTempTable("T1") 
warning: there was one deprecation warning; re-run with -deprecation for details 

scala> t1_df.count 
17/06/07 07:26:51 WARN util.Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf. 
res3: Long = 1732831 

scala> val t1_df1 = t1_df.dropDuplicates(Array("c1","c2","c3", "c4")) 

scala> df1.registerTempTable("ABC") 
warning: there was one deprecation warning; re-run with -deprecation for details 

scala> hiveContext.sql("select * from T1 where c1 not in (select c1 from ABC)").count 
[Stage 4:====================================================> (89 + 8)/97] 

我使用spark2.1.0并与250GB RAM中的每个节点7和64个虚拟核的亚马逊虚拟机集群上读取来自hive.2.1.1数据。有了这个庞大的资源,我期待这个简单的查询在1.7密尔recs飞行,但它的痛苦缓慢。 任何指针都会有很大的帮助。

更新: 添加解释计划:

scala> hiveContext.sql("select * from T1 where c1 not in (select c1 from ABC)").explain 
    == Physical Plan == 
    BroadcastNestedLoopJoin BuildRight, LeftAnti, (isnull((c1#26 = c1#26#1398)) || (c1#26 = c1#26#1398)) 
:- FileScan parquet default.t1_pq[cols 
more fields] Batched: false, Format: Parquet, Location: InMemoryFileIndex[hdfs://<hostname>/user/hive/warehouse/atn_load_pq], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<hdr_msg_src:string,hdr_recv_tsmp:timestamp,hdr_desk_id:string,execprc:string,dreg:string,c... 
+- BroadcastExchange IdentityBroadcastMode 
    +- *HashAggregate(keys=[c1#26, c2#59, c3#60L, c4#82], functions=[]) 
     +- Exchange hashpartitioning(c1#26, c2#59, c3#60L, c4#82, 200) 
     +- *HashAggregate(keys=[c1#26, c2#59, c3#60L, c4#82], functions=[]) 
      +- *FileScan parquet default.atn_load_pq[c1#26,c2#59,c3#60L,c4#82] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://<hostname>/user/hive/warehouse/atn_load_pq], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c1:string,c2:string,c3:bigint,c4:string> 
+0

多少资源,你有分配('spark.executor.instances'和'spark.executor.cores'),确保你已经把一些合理的数字这些设置。此外,请在SparkUI中查看作业/阶段是否具有足够的并行性。 –

+2

你能为我们做一件事吗?添加命令以显示物理计划:'hiveContext.sql(“select * from T1 where c1 not in(select c1 from ABC)”).explain()' –

+0

Thiago,添加了解释计划。 – birjoossh

回答

0

虽然我觉得你的计数始终在您的查询是0,你可以尝试使用左抗加入,不要忘记缓存t1_df避免多次重新计算

val t1_df = hiveContext.sql("select * from T1").cache 

t1_df 
    .join(
    t1_df.dropDuplicates(Array("c1","c2","c3", "c4")), 
    Seq("c1"), 
    "leftanti" 
    ) 
    .count() 
+0

谢谢拉尔夫;但我认为查询计划器会自动将其转换为反向左连接 – birjoossh