2016-09-22 72 views
0

我想使用parquet格式将DStream保存到HDFS中。问题是我的case类使用joda.DateTime,而Spark SQL不支持这一点。例如:将case类的DStream转换为joda.DateTime转换为Spark DataFrame

case class Log (timestamp: DateTime, ...dozen of other fields here...) 

但我得到的错误:java.lang.UnsupportedOperationException:类型org.joda.time.DateTime模式试图RDD转换为DF时,不支持:

def output(logdstream: DStream[Log]) { 
     logdstream.foreachRDD(elem => { 
      val df = elem.toDF() 
      df.saveAsParquet(...) 
     }); 
    } 

我模型很复杂并且有很多字段,所以我不想编写不同的案例类来摆脱joda.DateTime。另一种选择是直接从json保存到实木复合地板,但这并不理想。是否有一种简单的方法可以将joda.DateTime自动转换为与spark一起使用的sql.Timestamp(转换为Spark的数据框)。

谢谢。

回答

0

这是一个有点冗长,但你一试映射登录到SQL星火行:

logdstream.foreachRDD(rdd => { 
    rdd.map(log => Row(
    log.timestamp.toDate, 
    log.field2, 
    ... 
)).toDF().saveAsParquest(...) 
}) 
+0

嗨,不知道如果我理解正确。但是错误发生在语句中:val df = elem.toDF();换句话说,我无法使用.toDF()函数将RDD [Log]转换为数据框。你建议的解决方案似乎假定df已经可用? – auxdx

+0

你说得对,我错过了。我改变了答案。 – bear911

相关问题