我正在使用Spark读取文本文件。每条线可以属于不同的案例类。将行转换为案例类描述的对象后,我会将它们转换为数据框并写入HDFS(实木复合地板)。我遇到的问题是我最终得到了抽象类型的RDD,并且我需要将其限制为特定的案例类类型以应用toDF函数。将RDD [T]筛选到类型T的子类
到目前为止,我定义我的日志事件如下:
abstract class LogEvent
final case class Web(datetime: String, ...)
final case class OtherEvent(datetime: String ...)
我读我的文本文件,然后绘制线条针对一个模式匹配函数来创建一个RDD [的LogEvent]:
def convertToCase(e: List[String]): LogEvent= e match {
case List(_, _, _, "WEB", _, _, _, _, _, _, _, _, _) =>
Web(getDate(e.head), getTime(e.head), e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10))
case List(_, _, _, "OTHEREVENT", _, _, _, _, _, _, _, _) =>
OtherEvent(getDate(e.head), getTime(e.head), e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10))
}
在这一点上,我希望限制到一个给定的案例类和转换为Spark数据框。喜欢的东西:
val events = spark.read.textFile(...)
.map(_.split(',').toList)
.map(convertToCase)
我那么想减少RDD [的LogEvent]到T类型的RDD,这可能是在集合{网络,OtherEvent}。这正是我正在努力的。应用一个谓词约束到case类的过滤器不会改变LogEvent的类型,这意味着我不能调用'toDF()',因为这必须在RDD [T]上调用,其中T是特定的案例类,而不是抽象类RDD [LogEvent]。
val webEvents = events.filter(someLogic).toDF()
我正在寻找一种方法,可以将通用RDD降至特定案例类的RDD。我试图通过不使用isInstanceOf或asInstanceOf来保持类型安全的同时实现此目的。
有没有简单的解决方案呢?或者我以错误的方式处理问题?
在此先感谢。
谢谢!我偶然发现收集,并认为它会解决我的问题,但与Spark一起使用将不会将所有事件返回给驱动程序?我应该在体内指定,但我认为由于RDD实施,我无法使用收集。 – user3030878
'collect()'和'collect [U](pf:PartialFunction [T,U])'是RDD上两种截然不同的方法。只要你给方法一个部分函数参数(否则,编译器确实会假设你想收集到驱动程序),你就会安全的。 –
完美 - 谢谢 – user3030878