2014-12-28 133 views
1

我在PostgreSQL中拥有100万行和100列以上的数据源,并且我想使用Spark SQL,因此我想将此数据源转换为SchemaRDD将PostgreSQL数据库加载到SchemaRDD

两种方法在Spark SQL Programming Guide引入, 一种是通过反射,这意味着我需要定义:

case class Row(Var1: Int, Var2: String, ...) 

这是繁琐的,因为我有超过100个列。

另一种办法是“编程指定模式”,这意味着我需要定义:

val schema = 
    StructType(
    Seq(StructField("Var1", IntegerType), StructField("Var2", StringType), ...)) 

这对我来说也是乏味。

def extractValues(r: ResultSet) = { 
    (r.getInt("Var1"), r.getString("Var2"), ...) 
} 
val dbRDD = new JdbcRDD(sc, createConnection, 
    "SELECT * FROM PostgreSQL OFFSET ? LIMIT ?", 
    0, 1000000, 1, extractValues) 

这个API:

事实上,因为我加载使用JdbcRDD类我PostgreSQL数据库,但我发现我还需要在mapRow参数JdbcRDD构造的,它看起来像定义模式仍然有一个问题仍然要求我自己创建架构,更糟糕的是,我需要重做类似的东西来将这个JdbcRDD转换为SchemaRDD,那将是非常笨拙的代码。

所以我想知道什么是这个任务的最佳方法?

回答

2

您需要支持的数据类型数量有限。为什么不使用

java.sql.ResultSetMetaData 

例如,

val rs = jdbcStatement.executeQuery("select * from myTable limit 1") 
val rmeta = rs.getMetaData 

要读取一行,然后为每个列动态生成所需的StructField。

您将需要一个case语句来处理

val myStructFields = for (cx <- 0 until rmeta.getColumnCount) { 
     val jdbcType = rmeta.getColumnType(cx) 
     } yield StructField(rmeta.getColumnName(cx),jdbcToSparkType(jdbcType)) 

val mySchema = StructType(myStructFields.toSeq) 

哪里jdbcToSparkType是大致如下:

def jdbcToSparkType(jdbcType: Int) = { 
    jdbcType match { 
     case 4 => InteegerType 
     case 6 => FloatType 
     .. 
    } 

UPDATE要生成RDD [行]:你会遵循类似的模式。在这种情况下,你会

val rows = for (rs.next) { 
    row = jdbcToSpark(rs) 
    } yield row 

val rowRDD = sc.parallelize(rows) 

其中

def jdbcToSpark(rs: ResultSet) = { 
    var rowSeq = Seq[Any]() 
    for (cx <- 0 to rs.getMetaData.getColumnCount) { 
    rs.getColumnType(cx) match { 
     case 4 => rowSeq :+ rs.getInt(cx) 
      .. 
    } 
    } 
    Row.fromSeq(rowSeq) 
} 

然后 VAL行

相关问题