2015-05-04 19 views
1

我的spark应用程序使用自定义hadoop输入格式处理文件(平均大小为20 MB)并将结果存储在HDFS中。如何使用hadoop自定义输入格式调整Spark应用程序

以下是代码片段。

Configuration conf = new Configuration(); 


JavaPairRDD<Text, Text> baseRDD = ctx 
    .newAPIHadoopFile(input, CustomInputFormat.class,Text.class, Text.class, conf); 

JavaRDD<myClass> mapPartitionsRDD = baseRDD 
    .mapPartitions(new FlatMapFunction<Iterator<Tuple2<Text, Text>>, myClass>() { 
     //my logic goes here 
    } 

//few more translformations 
result.saveAsTextFile(path); 

该应用程序为每个文件创建1个任务/分区,并处理相应的零件文件并将其存储在HDFS中。

即,10000输入文件被创建任务的万和10000个的部分文件存储在HDFS。

两个mapPartitions和baseRDD地图操作创建每个文件1个任务。

SO质疑 How to set the number of partitions for newAPIHadoopFile? 建议设置 conf.setInt("mapred.max.split.size", 4);配置不分区。

但是,当这个参数设置CPU利用最大,没有舞台,甚至很长一段时间之后,无法启动。

如果我没有设置此参数,那么应用程序将成功完成,如上所述。

如何设置分区的数量与newAPIHadoopFile和提高效率?

mapred.max.split.size选项,会发生什么?

============

更新: 与mapred.max.split.size选项,会发生什么?

在我的使用情况下,文件尺寸小和改变分割大小选项是这里无关紧要。这个SO

更多信息:Behavior of the parameter "mapred.min.split.size" in HDFS

+0

mapred.max.split.size指定以字节为单位的大小,我认为 –

回答

0

只需使用baseRDD.repartition(<a sane amount>).mapPartitions(...)。这会将结果操作移至更少的分区,尤其是在文件很小的情况下。

相关问题