2016-04-06 58 views
13

我正在尝试使用Spark Dataset API,但我在执行简单连接时遇到了一些问题。Spark数据集API - 加入

比方说,我有两个数据集字段:date | value,然后在DataFrame的情况下,我加入会是什么样子:

val dfA : DataFrame 
val dfB : DataFrame 

dfA.join(dfB, dfB("date") === dfA("date")) 

然而,对于Dataset存在.joinWith方法,但相同的方法不起作用:

val dfA : Dataset 
val dfB : Dataset 

dfA.joinWith(dfB, ?) 

.joinWith需要什么参数?

回答

19

要使用joinWith您首先必须创建一个DataSet,最有可能的是其中两个。要创建DataSet,您需要创建一个与您的模式相匹配的案例类,并致电DataFrame.as[T],其中T是您的案例类。所以:

case class KeyValue(key: Int, value: String) 
val df = Seq((1,"asdf"),(2,"34234")).toDF("key", "value") 
val ds = df.as[KeyValue] 
// org.apache.spark.sql.Dataset[KeyValue] = [key: int, value: string] 

您也可以跳过的情况下类,并使用一个元组:

val tupDs = df.as[(Int,String)] 
// org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string] 

然后,如果你有另外的案例类/ DF,像这样说:

case class Nums(key: Int, num1: Double, num2: Long) 
val df2 = Seq((1,7.7,101L),(2,1.2,10L)).toDF("key","num1","num2") 
val ds2 = df2.as[Nums] 
// org.apache.spark.sql.Dataset[Nums] = [key: int, num1: double, num2: bigint] 

然后,而joinjoinWith的语法是相似的,结果是不同的:

df.join(df2, df.col("key") === df2.col("key")).show 
// +---+-----+---+----+----+ 
// |key|value|key|num1|num2| 
// +---+-----+---+----+----+ 
// | 1| asdf| 1| 7.7| 101| 
// | 2|34234| 2| 1.2| 10| 
// +---+-----+---+----+----+ 

ds.joinWith(ds2, df.col("key") === df2.col("key")).show 
// +---------+-----------+ 
// |  _1|   _2| 
// +---------+-----------+ 
// | [1,asdf]|[1,7.7,101]| 
// |[2,34234]| [2,1.2,10]| 
// +---------+-----------+ 

如您所见,joinWith将对象保留为元组的一部分,而join则将列平整为单个名称空间。 (这将导致在上述情况下的问题,因为列名“关键”是重复的)

奇怪的是,我必须使用df.col("key")df2.col("key")创造了条件加盟dsds2 - 如果你只使用col("key")在任何一方它都不起作用,并且ds.col(...)不存在。然而,使用原始的df.col("key")也有诀窍。

+3

详细说明申请的情况下类或获取数据。只是一个混乱。有没有更好的方法来编写类型化的连接条件。例如, df.col(“key”)是否可以有更安全的类型,可以在编译时解析“key”的正确性。 –

+5

我完全同意,基于这种语法在创建数据集时没有用处,那么好处在哪里?我无法克服这个事实,没有类型的选择..这样的可惜! – Sparky

2

在上面的例子中,你可以试试下面的选项 -

  • 定义的情况下,类为输出

    case class JoinOutput(key:Int, value:String, num1:Double, num2:Long)

  • 通过“Seq(”key“)”加入两个数据集,这将帮助您避免输出中的两个重复键列。这将有利于在下一步

    ds.join(ds2, Seq("key")).as[JoinOutput] res27: org.apache.spark.sql.Dataset[JoinOutput] = [key: int, value: string ... 2 more fields]

    scala> ds.join(ds2, Seq("key")).as[JoinOutput].show +---+-----+----+----+ |key|value|num1|num2| +---+-----+----+----+ | 1| asdf| 7.7| 101| | 2|34234| 1.2| 10| +---+-----+----+----+

+0

你没有专门回答这个问题,但Seq(“key”)提示帮助我解决了问题 – ImDarrenG