2017-03-20 103 views
1

我正在阅读社交网络的json文件为spark。我从这些数据框中获得了我为了获得配对而爆炸的数据。 这个过程很完美。稍后我想将其转换为RDD(用于GraphX),但创建RDD需要很长时间。火花数据帧转换为rdd需要很长时间

val social_network = spark.read.json(my/path) // 200MB 
val exploded_network = social_network. 
    withColumn("follower", explode($"followers")). 
    withColumn("id_follower", ($"follower").cast("long")). 
    withColumn("id_account", ($"account").cast("long")). 
    withColumn("relationship", lit(1)). 
    select("id_follower", "id_account", "relationship") 
val E1 = exploded_network.as[(VertexId, VertexId, Int)] 
val E2 = E1.rdd 

要检查的过程是如何运行的,我算在每一步

scala> exploded_network.count 
res0: Long = 18205814 // 3 seconds 

scala> E1.count 
res1: Long = 18205814 // 3 seconds 

scala> E2.count // 5.4 minutes 
res2: Long = 18205814 

为什么RDD转换以100倍?

回答

0

在Spark中,DataFrame是一个分布式数据集合,组织成命名列(表格格式)。它在概念上等同于关系数据库中的表或R/Python中的数据框,但具有更丰富的优化。而且由于其表格格式,它具有允许spark在后台运行优化次数的元数据。 DataFrame API使用Spark的高级优化,如钨执行引擎和催化剂优化器来更好地处理数据。

在RDD中,RDD不推断给定数据集的模式,并要求用户提供任何模式。另外,Rdd不能利用Spark优化器(如Catalyst优化器和钨执行引擎)(如上所述)。

所以DataFrame的性能比RDD好得多。在你的情况下,如果你必须使用RDD而不是数据帧,那么我建议在转换为rdd之前缓存数据帧。这应该会提高你的rdd性能。

val E1 = exploded_network.cache() 
val E2 = E1.rdd 

希望这有助于。

相关问题