我正在尝试在Hadoop中查找中位数。该作业与以下错误而失败:执行Mapreduce时出现Java堆空间错误
16/03/02 02:46:13 INFO mapreduce.Job: Task Id : attempt_1456904182817_0001_r_000412_0, Status : FAILED
Error: Java heap space
我经历了很多帖子解决类似问题的去了,但没有奏效。也帮助了来自:
我尝试以下可能的解决方案:
- 增加Java堆大小在上面的帖子建议。
- 通过改变以下特性容器的
增加的尺寸:在纱线-site.xml中
yarn.scheduler.minimum分配-MB 1024
增加减速器的编号,以像这样的更大的值:
job.setNumReduceTasks(1000);
但是,没有任何上述工作对我来说。因此,我张贴这个。 我知道中位数不适合Hadoop,但任何人都可以提供任何可能有所帮助的解决方案。
java version "1.8.0_60"
Hadoop version is 2.x
我有一个10节点群集,每个节点上有8 GB RAM,每个节点上有80 GB硬盘。
这里是整个代码:
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.apache.commons.math3.stat.descriptive.rank.Median;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class median_all_keys {
//Mapper
public static class map1 extends Mapper<LongWritable,Text,Text,DoubleWritable>{public void map(LongWritable key, Text value, Context context)
throws IOException,InterruptedException{
String[] line= value.toString().split(",");
double col1=Double.parseDouble(line[6]);
double col2=Double.parseDouble(line[7]);
context.write(new Text("Key0"+"_"+line[0]+"_"+"Index:6"), new DoubleWritable(col1));
context.write(new Text("Key0"+"_"+line[0]+"_"+"Index:7"), new DoubleWritable(col2));
context.write(new Text("Key1"+"_"+line[1]+"_"+"Index:6"), new DoubleWritable(col1));
context.write(new Text("Key1"+"_"+line[1]+"_"+"Index:7"), new DoubleWritable(col2));
context.write(new Text("Key2"+"_"+line[2]+"_"+"Index:6"), new DoubleWritable(col1));
context.write(new Text("Key2"+"_"+line[2]+"_"+"Index:7"), new DoubleWritable(col2));
context.write(new Text("Key0"+"_"+line[0] +","+"key1"+"_"+ line[1]+"_"+"Index:6"), new DoubleWritable(col1));
context.write(new Text("Key0"+"_"+line[0] +","+"key1"+"_"+ line[1]+"_"+"Index:7"), new DoubleWritable(col2));
context.write(new Text("Key1"+"_"+line[1] +","+"key2"+"_"+ line[2]+"_"+"Index:6"), new DoubleWritable(col1));
context.write(new Text("Key1"+"_"+line[1] +","+"key2"+"_"+ line[2]+"_"+"Index:7"), new DoubleWritable(col2));
context.write(new Text("Key0"+"_"+line[0] +","+"key2"+"_"+ line[2]+"_"+"Index:6"), new DoubleWritable(col1));
context.write(new Text("Key0"+"_"+line[0] +","+"key2"+"_"+ line[2]+"_"+"Index:7"), new DoubleWritable(col2));
context.write(new Text("Key0"+"_"+line[0] +","+"key1"+"_"+ line[1]+","+"key2"+"_"+line[2]+"_"+"Index:6"),new DoubleWritable(col1));
context.write(new Text("Key0"+"_"+line[0] +","+"key1"+"_"+ line[1]+","+"key2"+"_"+line[2]+"_"+"Index:7"),new DoubleWritable(col2));
}
}
//Reducer
public static class sum_reduce extends Reducer<Text,DoubleWritable,Text,DoubleWritable>{
// HashMap<String,List<Float>> median_map = new HashMap<String,List<Float>>();
@SuppressWarnings({ "unchecked", "rawtypes" })
public void reduce(Text key,Iterable<DoubleWritable> value, Context context)
throws IOException,InterruptedException{
List<Double> values = new ArrayList<>();
for (DoubleWritable val: value){
values.add(val.get());
}
double res = calculate(values);
context.write(key, new DoubleWritable(res));
}
public static double calculate(List<Double> values) {
DescriptiveStatistics descriptiveStatistics = new DescriptiveStatistics();
for (Double value : values) {
descriptiveStatistics.addValue(value);
}
return descriptiveStatistics.getPercentile(50);
}
}
public static void main(String[] args) throws Exception {
Configuration conf= new Configuration();
Job job = new Job(conf,"Sum for all keys");
//Driver
job.setJarByClass(median_all_keys.class);
//Mapper
job.setMapperClass(map1.class);
//Reducer
job.setReducerClass(sum_reduce.class);
//job.setCombinerClass(TestCombiner.class);
//Output key class for Mapper
job.setMapOutputKeyClass(Text.class);
//Output value class for Mapper
job.setMapOutputValueClass(DoubleWritable.class);
//Output key class for Reducer
job.setOutputKeyClass(Text.class);
job.setNumReduceTasks(1000);
//Output value class from Reducer
job.setOutputValueClass(DoubleWritable.class);
//Input Format class
job.setInputFormatClass(TextInputFormat.class);
//Final Output Format class
job.setOutputFormatClass(TextOutputFormat.class);
//Path variable
Path path = new Path(args[1]);
//input/output path
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
path.getFileSystem(conf).delete(path);
//exiting the job
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
我没有发现任何问题与硬件的规格。如果你可以发布你的代码,有人可以提供帮助。确保你没有使用任何Java集合对象在mappers/redurs中存储一些数据,可能会导致Java堆。 – srikanth
@srikanth 我已经添加了代码。 –