要使用joinWith
您首先必须创建一个DataSet
,最有可能的是其中两个。要创建DataSet
,您需要创建一个与您的模式相匹配的案例类,并致电DataFrame.as[T]
,其中T
是您的案例类。所以:
case class KeyValue(key: Int, value: String)
val df = Seq((1,"asdf"),(2,"34234")).toDF("key", "value")
val ds = df.as[KeyValue]
// org.apache.spark.sql.Dataset[KeyValue] = [key: int, value: string]
您也可以跳过的情况下类,并使用一个元组:
val tupDs = df.as[(Int,String)]
// org.apache.spark.sql.Dataset[(Int, String)] = [_1: int, _2: string]
然后,如果你有另外的案例类/ DF,像这样说:
case class Nums(key: Int, num1: Double, num2: Long)
val df2 = Seq((1,7.7,101L),(2,1.2,10L)).toDF("key","num1","num2")
val ds2 = df2.as[Nums]
// org.apache.spark.sql.Dataset[Nums] = [key: int, num1: double, num2: bigint]
然后,而join
和joinWith
的语法是相似的,结果是不同的:
df.join(df2, df.col("key") === df2.col("key")).show
// +---+-----+---+----+----+
// |key|value|key|num1|num2|
// +---+-----+---+----+----+
// | 1| asdf| 1| 7.7| 101|
// | 2|34234| 2| 1.2| 10|
// +---+-----+---+----+----+
ds.joinWith(ds2, df.col("key") === df2.col("key")).show
// +---------+-----------+
// | _1| _2|
// +---------+-----------+
// | [1,asdf]|[1,7.7,101]|
// |[2,34234]| [2,1.2,10]|
// +---------+-----------+
如您所见,joinWith
将对象保留为元组的一部分,而join
则将列平整为单个名称空间。 (这将导致在上述情况下的问题,因为列名“关键”是重复的)
奇怪的是,我必须使用df.col("key")
和df2.col("key")
创造了条件加盟ds
和ds2
- 如果你只使用col("key")
在任何一方它都不起作用,并且ds.col(...)
不存在。然而,使用原始的df.col("key")
也有诀窍。
详细说明申请的情况下类或获取数据。只是一个混乱。有没有更好的方法来编写类型化的连接条件。例如, df.col(“key”)是否可以有更安全的类型,可以在编译时解析“key”的正确性。 –
我完全同意,基于这种语法在创建数据集时没有用处,那么好处在哪里?我无法克服这个事实,没有类型的选择..这样的可惜! – Sparky