2017-10-18 95 views
0

我读了reducebyKey对大数据集来说是一个更好的选择,可以减少数据的混洗,并以这种方式提高性能。我想转换我的使用groupByKey。首先,它必须被转换为RDD:将groupByKey转换为reduceByKey

val linksNew = links.map(convertToRelationship) 
    .flatMap(bidirRelationship) 

链接是一个数据集和数据集的API没有reduceByKey。当使用reduceByKey时,相当于.groupByKey(_._1)

val linksfinal = linksNew.rdd.reduceByKey(???) 

实际代码:

enter image description here

一些数据集中的实际数据:

val biLinks = links 
    .map(convertToRelationship) 
    .flatMap(bidirRelationship) 
    .groupByKey(_._1) 
    .reduceGroups((left, right) => combineBidirerRelationships(left,right)) 
    .map(_._2._2) 

数据集的架构,只是groupByKey(_._1)之前使用enter image description here

+0

你的数据在'groupByKey()'之前的样子是怎么样的? – Shaido

+0

@Shaido喜欢这样''DataSet(String,Relationship)'然后应该如下 with groupByKey跟着,'KeyValueGroupedDataSet [String,(String,Relationship)]' – dedpo

+0

添加一个我相信会工作的答案,尽管我没有'对它进行测试,告诉我它是如何工作的。 – Shaido

回答

1

不知道它是否更有效率,但是,应该可以将其转换为reduceByKey,因为您在groupByKey之后直接执行reduceGroups

val biLinks = links 
    .map(convertToRelationship) 
    .flatMap(bidirRelationship) 
    .rdd 
    .map{row => (row.getAs[String](0), row.getAs[Relationship](1))} // See explanation below 
    .reduceByKey((left, right) => combineBidirerRelationships(left, right)) 
    .map(_._2._2) 

根据数据帧看起来像使用.rdd后,可以要求额外的转换:短例如,使用所提供的代码的一部分。从数据帧转换时,生成的rdd将是RDD[Row]。但是,要使reduceByKey()工作,需要类型RDD[(A,B)]的元组rdd,其中AB是类型(它们本身也可以是元组)。如何在rdd.map(...)转化可以structs工作


短的例子:

case class Relationship(a: Long, b: Long) 
val df = spark.createDataFrame(Seq((1, Relationship(3L, 2L)), (2, Relationship(20L, 7L)))).toDF() 
val rdd = df.rdd.map{ row => (row.getAs[String](0), row.getAs[Relationship](1))} 

这使所需的元组RDD类型,这里RDD[(String, Relationship)]

+0

这两个添加,'col1:String,col2:Relationship' 当我打印我们试图转换的模式,他们是struct1和striuct2,例如struct2是关系案例类创建的数据? – dedpo

1

我看到,reducebyKey是一个更好的选择大型数据集,以减少洗牌和或洗牌减少方面,并提高性能。

不是。你在混淆“旧”RDD API,其中groupByKey有不同的语义。

Dataset API groupByKey + reduceGroups在旧API中使用与reduceByKey类似的执行模型。事实上,转换为RDD时使用效率较低的洗牌机制,代价非常高昂,所以你只会让情况变得更糟。

相关问题