2017-02-14 20 views
0

我的日志文件基于创建的日志文件的日期进入不同的目录。在Spark中结合多个目录中的日志

例如

> /mypath/2017/01/20/... 
. 
. 
. 
> /mypath/2017/02/13/... 
> /mypath/2017/02/14/... 

我想所有这些日志文件合并成使用pyspark一个单一RDD,这样我可以做这个主文件的集合体。

到目前为止,我已经取得了单独的目录,名为sqlContext并使用Union来加入特定日期的所有日志文件。

DF1 = (sqlContext.read.schema(schema).json("/mypath/2017/02/13")).union(sqlContext.read.schema(schema).json("/mypath/2017/02/14")) 

是否有一种简单的方法通过指定日期范围内的日志文件来获取主rdd? (即从2017/01/20到2017/02/14)

我很新的火花,请纠正我,如果我在任何一步错了。

+0

另外,如果我想过滤器的基础在我加入所有这些日志(比如说DF1)之后,在“Type”列上。这样做的最佳过程是什么? (我通常使用DF1.filter())。有没有其他有效的方法? – SpaceOddity

+0

sqlContext.read.schema(schema).json(“/ mypath/2017/02/[13-14]”))不起作用。它说“非法文件模式:在索引4附近有非法字符范围” – SpaceOddity

回答

1

如果你坚持使用sqlContext然后一个简单的解决方案将是确定如果你想将列出所有的文件输入目录

case class FileWithDate(basePath: String, year: Int, month: Int, day: Int) { 
def path = s"${basePath}/${year}/${month}/${day}" 
} 

def listFileSources() : List[FileWithDate] = ??? // implement here 

内工会从您可以将源的所有dataframes的方法像这样做:

// create an empty dataframe with the strucutre for the json 
val files = listSources() 
val allDFs = files.foldLeft(emptyDF){case (df, f) => df.union(sqlContext.read.schema(schema).json(f.path))} 

如果要筛选按日期输入文件,那么这将是容易的。事情是这样的

files.filter(_.year == 2016 && (_.month >=2 || _.month <=3)) 

另一种解决方案将是增加自己的dataframes(把附加列)用年,月,日和做所有的业务逻辑上新dataframes

+0

我认为用最少的Scala知识可以实现listFileSources方法。你应该做的是获取mypath文件夹中的所有文件(对子文件夹进行递归迭代)并创建FileWithDate类型的对象。这些对象被追加到方法返回的列表中。 – dumitru