2016-03-18 49 views
1

我正在使用spark处理一个非常大的数据集。数据存储为avro文件。数据也被组织在一个目录结构中(/ input/yyyy/MM/dd/HH /)。因此,例如今天的avro文件将位于/ input/2016/03/18/00至/ input/2016/03/18/23Spark创建100个空的avro文件

现在如果我处理最近2年的数据,许多avro文件被处理。

的数据处理代码如下

val inputRDD = sc.load("/input", "com.databricks.spark.avro").rdd 
val outputRDD = inputRDD.map(foo).filter(_.isDefined).flatMap(x => x).join(anotherRDD).map { 
    case (a, (b, (c, d))) => (a, (b, c, d)) 
}.join(yetAnotherRDD).filter { 
    case (a, ((b, c, d), (e, f))) => Math.abs(a - b) <= 2000 
}.map { 
    case (a, ((b, c, d), (e, f))) => Row(a, d) 
} 
val outputDF = sc.createDataframe(outputRDD, outputSchema) 
outputDF.save(s"/output/${datePath(date)}", "com.databricks.spark.avro") 

现在,当我内部输出去使用色相。我看到181页,每页上我都看到很多空的avro文件。

并非所有的文件都是空的......但有太多的空文件。

如果我不想要空文件,该怎么办? (不诉诸于“收集”)

回答

1

每个输入文件至少会产生一个RDD(如果文件很大,我可能会在多个输入序列中读取并制作多个RDD)。 在你的应用程序中,你在这些RDD上执行过滤器,所以有可能相当多的RDD最终都是空的,因为它们的所有行都被过滤掉了。保存DataFrame时,每个RDD都将保存到不同的PART文件中,因此一个空的RDD将生成一个空的RDD文件。 要解决此问题,请使用.coalesce(n),这将缩小RDD的数量。 因此,尝试一下你的上线是这样的:

outputDF.coalesce(200).save(s"/output/${datePath(date)}", "com.databricks.spark.avro") 

中聚结,用的数量是高度依赖于数据的大小。如果RDD太多,由于与驱动程序之间的通信开销会导致很多性能损失;如果RDD太少,则可能不会使用所有可用的执行程序,这也会导致性能低于最佳性能。

+0

我检查了,但spark 1.3.0没有colease功能。我想他们稍后将它添加到数据框中。 –

+1

在这种情况下,您可以使用'.repartition(200)'。请注意,'.coalesce(200)'会在更高版本的Spark上执行得更好。 –