2017-06-28 309 views
0

我有一个HDFS文件夹和两个250MB地板文件。 hadoop df块大小设置为128MB。 有以下代码:火花地板数据帧分区数

JavaSparkContext sparkContext = new JavaSparkContext(); 

    SQLContext sqlContext = new SQLContext(sparkContext); 
    DataFrame dataFrame = sqlContext.read().parquet("hdfs:////user/test/parquet-folder"); 
    LOGGER.info("Nr. of rdd partitions: {}", dataFrame.rdd().getNumPartitions()); 

    sparkContext.close(); 

我与spark.executor.instances = 3和spark.executor.cores = 4在集群上运行它。我可以看到,实木复合地板的文件的读取3个执行人X 4个核= 12次的任务中拆分:

spark.SparkContext: Starting job: parquet at VerySimpleJob.java:25 
    scheduler.DAGScheduler: Got job 0 (parquet at VerySimpleJob.java:25) with 12 output partitions 

然而,当我得到的数据框RDD(或创建RDD与toJavaRDD())调用,我只获得4个分区。这是由hdfs块大小控制 - 每个文件2个块,因此4个分区?

为什么这不匹配parquet(parent?)操作的分区数?

+0

回答如下,但总的来说你是对的 - 这都是关于HDFS块的大小。 – Zyoma

+0

基于@Zyoma的建议,我已经更新了代码,试图强制更小的分割,从而为数据框提供更多的输入分区。以下配置已更改:** parquet.block.size,mapred.max.split.size,mapred.min.split.size全部设置为Long.toString(8 * 1024 * 1024L)**。这*仍然*给我回4个分区 –

回答

1

当您使用Spark读取文件时,执行程序的数量和内核数量都不会以任何方式影响任务数量。分区数量(以及作为结果的任务)仅由输入中的块数决定。如果你有4个文件小于HDFS块大小 - 无论如何是4块,结果是4个分区。公式为number_of_files * number_of_blocks_in_file。因此,查看您的文件夹并计算它包含的文件数量以及每个文件的大小。这应该回答你的问题。

UPD:如果您没有手动重新分区数据帧以上的一切是真实的,如果你的数据帧不作为加入的结果,或者任何其他整理操作创建。

UPD:修复答案的详细信息。

+0

我的文件夹包含2个文件,每个文件有250MB。所以基本上你是说没有办法有更多的分区比块的数量(在这种情况下,4块128 MB)?为什么我会在最初阅读文件时看到创建的12个任务?或者我对这12个taska是什么的解释是错误的?在这里:https://stackoverflow.com/questions/27194333/how-to-split-parquet-files-into-many-partitions-in-spark有人建议用较小的parquet.block.size编写镶木地板文件可能会诀窍 - 但我试过设置,但没有运气。 –

+0

正确。您可以使用**重新分配**方法强制分配数量。 – Zyoma

+0

我知道重新分配是一种选择,但会触发混洗,这不是最佳选择。我在集群中有更多的核心*执行程序,我希望通过从初始读取操作中获取更多分区来理想地使用它。 –