0
我想使用parquet格式将DStream保存到HDFS中。问题是我的case类使用joda.DateTime,而Spark SQL不支持这一点。例如:将case类的DStream转换为joda.DateTime转换为Spark DataFrame
case class Log (timestamp: DateTime, ...dozen of other fields here...)
但我得到的错误:java.lang.UnsupportedOperationException:类型org.joda.time.DateTime模式试图RDD转换为DF时,不支持:
def output(logdstream: DStream[Log]) {
logdstream.foreachRDD(elem => {
val df = elem.toDF()
df.saveAsParquet(...)
});
}
我模型很复杂并且有很多字段,所以我不想编写不同的案例类来摆脱joda.DateTime。另一种选择是直接从json保存到实木复合地板,但这并不理想。是否有一种简单的方法可以将joda.DateTime自动转换为与spark一起使用的sql.Timestamp(转换为Spark的数据框)。
谢谢。
嗨,不知道如果我理解正确。但是错误发生在语句中:val df = elem.toDF();换句话说,我无法使用.toDF()函数将RDD [Log]转换为数据框。你建议的解决方案似乎假定df已经可用? – auxdx
你说得对,我错过了。我改变了答案。 – bear911