0

随着数据帧称为lastTail,我可以重复这样的:星火斯卡拉据帧单列转换成JSON用于PostrgeSQL插入

import scalikejdbc._ 
// ... 
// Do Kafka Streaming to create DataFrame lastTail 
// ... 

lastTail.printSchema 

lastTail.foreachPartition(iter => { 

// open database connection from connection pool 
// with scalikeJDBC (to PostgreSQL) 

    while(iter.hasNext) { 
    val item = iter.next() 
    println("****") 
    println(item.getClass) 
    println(item.getAs("fileGid")) 
    println("Schema: "+item.schema) 
    println("String: "+item.toString()) 
    println("Seqnce: "+item.toSeq) 

    // convert this item into an XXX format (like JSON) 
    // write row to DB in the selected format 
    } 
}) 

这个输出“像”(与密文): root |-- fileGid: string (nullable = true) |-- eventStruct: struct (nullable = false) | |-- eventIndex: integer (nullable = true) | |-- eventGid: string (nullable = true) | |-- eventType: string (nullable = true) |-- revisionStruct: struct (nullable = false) | |-- eventIndex: integer (nullable = true) | |-- eventGid: string (nullable = true) | |-- eventType: string (nullable = true)

和(只有一个迭代项 - 编辑,但希望具有足够好的语法)

**** class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema 12345 Schema: StructType(StructField(fileGid,StringType,true), StructField(eventStruct,StructType(StructField(eventIndex,IntegerType,true), StructField(eventGid,StringType,true), StructField(eventType,StringType,true)), StructField(revisionStruct,StructType(StructField(eventIndex,IntegerType,true), StructField(eventGid,StringType,true), StructField(eventType,StringType,true), StructField(editIndex,IntegerType,true)),false)) String: [12345,[1,4,edit],[1,4,revision]] Seqnce: WrappedArray(12345, [1,4,edit], [1,4,revision])

注:我在https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerPartition.scala上执行val metric = iter.sum这部分,但改为使用DataFrame。我也在使用“使用foreachRDD的设计模式”,参见http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning

我如何转换这种 org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema (见https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala) 迭代项目为东西是随便写(JSON或...? - 我愿意)到PostgreSQL中。 (如果不是JSON,请建议如何将此值读回到DataFrame以供其他点使用)。

回答

0

那么我想出了一个不同的方法来解决这个问题。

val ltk = lastTail.select($"fileGid").rdd.map(fileGid => fileGid.toString) 
val ltv = lastTail.toJSON 
val kvPair = ltk.zip(ltv) 

然后,我只是遍历RDD而不是DataFrame。

kvPair.foreachPartition(iter => { 
    while(iter.hasNext) { 
    val item = iter.next() 
    println(item.getClass) 
    println(item) 
    } 
}) 

的数据不谈,我得到class scala.Tuple2这使得存储在JDBC/PostgreSQL的KV对一个更简单的方法。

我相信还有其他方法可以解决问题。

+0

更好 - @ zero323指出我这个话题,以改善我的答案的第一部分(即删除zip) - http://stackoverflow.com/questions/36157810/spark-row-to-json – codeaperature