我在PostgreSQL中拥有100万行和100列以上的数据源,并且我想使用Spark SQL,因此我想将此数据源转换为SchemaRDD
。将PostgreSQL数据库加载到SchemaRDD
两种方法在Spark SQL Programming Guide引入, 一种是通过反射,这意味着我需要定义:
case class Row(Var1: Int, Var2: String, ...)
这是繁琐的,因为我有超过100个列。
另一种办法是“编程指定模式”,这意味着我需要定义:
val schema =
StructType(
Seq(StructField("Var1", IntegerType), StructField("Var2", StringType), ...))
这对我来说也是乏味。
def extractValues(r: ResultSet) = {
(r.getInt("Var1"), r.getString("Var2"), ...)
}
val dbRDD = new JdbcRDD(sc, createConnection,
"SELECT * FROM PostgreSQL OFFSET ? LIMIT ?",
0, 1000000, 1, extractValues)
这个API:
事实上,因为我加载使用JdbcRDD
类我PostgreSQL
数据库,但我发现我还需要在mapRow
参数JdbcRDD
构造的,它看起来像定义模式仍然有一个问题仍然要求我自己创建架构,更糟糕的是,我需要重做类似的东西来将这个JdbcRDD
转换为SchemaRDD
,那将是非常笨拙的代码。
所以我想知道什么是这个任务的最佳方法?