3
我现在遇到一个问题,我试图从多个文件中使用烫伤读取并使用单个文件创建输出。我的代码是这样的:使用烫伤法读取多个文件并输出一个SINGLE文件
def getFilesSource (paths: Seq[String]) = {
new MultipleTextLineFiles(paths: _*) {
override protected def createHdfsReadTap(hdfsMode: Hdfs): Tap[JobConf, _, _] = {
val taps = goodHdfsPaths(hdfsMode).toList.map {
path => CastHfsTap (new Hfs (hdfsScheme, path, sinkMode))
}
taps.size match {
case 0 => {
CastHfsTap (new Hfs(hdfsScheme, hdfsPaths.head, sinkMode))
}
case 1 => taps.head
case _ => new ScaldingMultiSourceTap(taps)
}
}
}
}
但是当我运行这段代码,它分裂我的输出到许多文件,但是里面的数据非常少:短短K.相反,我希望能够聚集所有输出文件成一个单一的。
我烫码是:
val source = getFilesSource(mapped) // where mapped is a Sequence of valid HDFS paths (Seq [String])
TypedPipe.from(source).map(a => Try{
val json = JSON.parseObject(a)
(json.getInteger("prop1"), json.getInteger("prop2"), json.getBoolean("prop3"))
}.toOption).filter(a => a.nonEmpty)
.map(a => a.get)
.filter(a => !a._3)
.map (that => MyScaldingType (that._1, that._2))
.write(MyScaldingType.typedSink(typedArgs))
我想我必须重写型ScaldingMultiSourceTap的“sourceConfInit”的方法,但我不知道怎么写里面...
嗨@karthikcru,谢谢你的回答,听起来很有希望。今天早上我会试试这个。我希望,因为我在映射阶段执行过滤器(使用声明:.filter(a =>!a._3)),对于我的商业案例,将会有大量数据不会通过该过滤条件。剩余的东西将被发送到单个减速器。 –