2017-05-04 16 views
1

我正在使用Spark读取文本文件。每条线可以属于不同的案例类。将行转换为案例类描述的对象后,我会将它们转换为数据框并写入HDFS(实木复合地板)。我遇到的问题是我最终得到了抽象类型的RDD,并且我需要将其限制为特定的案例类类型以应用toDF函数。将RDD [T]筛选到类型T的子类

到目前为止,我定义我的日志事件如下:

abstract class LogEvent 
final case class Web(datetime: String, ...) 
final case class OtherEvent(datetime: String ...) 

我读我的文本文件,然后绘制线条针对一个模式匹配函数来创建一个RDD [的LogEvent]:

def convertToCase(e: List[String]): LogEvent= e match { 
    case List(_, _, _, "WEB", _, _, _, _, _, _, _, _, _) => 
    Web(getDate(e.head), getTime(e.head), e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10)) 
    case List(_, _, _, "OTHEREVENT", _, _, _, _, _, _, _, _) => 
    OtherEvent(getDate(e.head), getTime(e.head), e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10)) 
} 

在这一点上,我希望限制到一个给定的案例类和转换为Spark数据框。喜欢的东西:

val events = spark.read.textFile(...) 
    .map(_.split(',').toList) 
    .map(convertToCase) 

我那么想减少RDD [的LogEvent]到T类型的RDD,这可能是在集合{网络,OtherEvent}。这正是我正在努力的。应用一个谓词约束到case类的过滤器不会改变LogEvent的类型,这意味着我不能调用'toDF()',因为这必须在RDD [T]上调用,其中T是特定的案例类,而不是抽象类RDD [LogEvent]。

val webEvents = events.filter(someLogic).toDF() 

我正在寻找一种方法,可以将通用RDD降至特定案例类的RDD。我试图通过不使用isInstanceOf或asInstanceOf来保持类型安全的同时实现此目的。

有没有简单的解决方案呢?或者我以错误的方式处理问题?

在此先感谢。

回答

3

应该使用collect(f: PartialFunction[T, U]): RDD[U]方法(不与collect(): Array[T]它发送结果为阵列到驱动器混淆):

val webEvents = events.collect{ 
    case w: Web => w 
}.toDF() 

collectmapfilter之间的混合:如果输入匹配在模式匹配中给出的情况之一,它将输出部分函数给出的值。否则,它将简单地忽略(即过滤掉)输入。

请注意,您应该也可以为convertToCase执行此操作,因为您定义的模式匹配不完整,并且如果遇到意外事件或损坏的行,则可能会在运行时遇到错误。要做到这一点的正确方法是定义

val convertToCase: PartialFunction[List[String], LogEvent] = { 
    case List(_, _, _, "WEB", _, _, _, _, _, _, _, _, _) => 
    Web(getDate(e.head), getTime(e.head), e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10)) 
    case List(_, _, _, "OTHEREVENT", _, _, _, _, _, _, _, _) => 
    OtherEvent(getDate(e.head), getTime(e.head), e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10)) 
} 

然后,带着collect(convertToCase)取代map(convertToCase)

+0

谢谢!我偶然发现收集,并认为它会解决我的问题,但与Spark一起使用将不会将所有事件返回给驱动程序?我应该在体内指定,但我认为由于RDD实施,我无法使用收集。 – user3030878

+1

'collect()'和'collect [U](pf:PartialFunction [T,U])'是RDD上两种截然不同的方法。只要你给方法一个部分函数参数(否则,编译器确实会假设你想收集到驱动程序),你就会安全的。 –

+0

完美 - 谢谢 – user3030878