我有一组使用嵌套键值对的大型压缩json文件。 json对象中有大约70-80个键(和子键),但是,我只对几个键感兴趣。我想用Spark SQL查询json文件,只挑出我感兴趣的键值对,并将它们输出到一组csv文件。处理一个大小为170MB的压缩json文件需要大约5分钟的时间。我只是想知道是否有任何方法来优化这个过程。或者除了Spark这样的工作,还有其他更好的工具吗?谢谢!快速处理Spark中的json文件的方法
下面是我用的是Scala代码的快照:
val data = sc.textFile("abcdefg.txt.gz")
// repartition the data
val distdata = data.repartition(10)
val dataDF = sqlContext.read.json(distdata)
// register a temp table
dataDF.registerTempTable("pixels")
// query the json file, grab columns of interest
val query =
"""
|SELECT col1, col2, col3, col4, col5
|FROM pixels
|WHERE col1 IN (col1_v1, col1_v2, ...)
""".stripMargin
val result = sqlContext.sql(query)
// reformat the timestamps
val result2 = result.map(
row => {
val timestamp = row.getAs[String](0).stripSuffix("Z").replace("T"," ")
Row(timestamp, row(1), row(2), row(3), row(4), row(5), row(6), row(7),
row(8), row(9), row(10), row(11))
}
)
// output the result to a csv and remove the square bracket in each row
val output_file = "/root/target"
result2.map(row => row.mkString(",")).saveAsTextFile(output_file)
我;猜大部分时间的推移读/解压缩和写作,这不能并行化。添加分配作业和收集结果的开销,我的猜测是使用Spark会让你放慢速度。为什么未分析的行的“重新分配”? –
如果你只是想改变你的数据。你不需要所有的SparkSQL功能。只要坚持RDD的。使用像PlayJson这样的快速json库来解析json。修改并转储它。 –
除非明确要求,否则请勿对RDD进行重新分区。 –