0
我目前正在尝试学习hadoop编程,并编写处理一个映射器内两个输入源的程序。这项工作与mapside-join问题类似。hadoop设置方法映射器
所以,我第一次使用分布式缓存,但是,它不工作。因此我第二次使用了setup()函数。它在单个PC上以本地执行模式运行良好,但在集群环境中无法运行。
我完全不知道原因。
如果我们使用setup()函数,集群是否有配置?
而下面是我的代码的一部分。这部分是体现迭代工作的工作驱动程序。
public int run(String[] arg0) throws Exception {
// TODO Auto-generated method stub
int iteration = 1;
Configuration conf = new Configuration();
Path in = new Path(arg0[0]);
Path out = new Path(arg0[1]+"iteration_"+iteration);
conf.set("conf.threshold", arg0[2]);
Job job = new Job(conf, "Test");
job.setJarByClass(getClass());
job.setMapperClass(FirstMap.class);
job.setReducerClass(FirstReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
job.waitForCompletion(true);
// start second job
// long counter = 4;//job.getCounters().findCounter(SecondReduce.Counter.CONVERGED).getValue();
String PriorPath = out.toString();
boolean Updates = true;
while (Updates) {
iteration ++;
conf = new Configuration();
Path out2 = new Path(arg0[1]+"iteration_"+iteration);
conf.set("prior.job.out", PriorPath);
conf.set("conf.iteration", iteration+"");
job = new Job(conf, "job"+iteration);
job.setJarByClass(getClass());
job.setMapperClass(SecondMap.class);
job.setReducerClass(SecondReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out2);
job.waitForCompletion(true);
PriorPath = out2.toString();
long counter = job.getCounters().findCounter(Counter.CONVERGED).getValue();
Updates = (counter > 0);
System.out.println("counter : " + counter);
}
return 0;
}
此外,包含设置功能的映射器如下。
public static class SecondMap extends
Mapper<LongWritable, Text, Text, IntWritable> {
IntWritable one = new IntWritable(1);
Vector<String> Vec = new Vector<String>();
Vector<String> Gen = new Vector<String>();
int iteration;
@Override
public void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
Path Cand = new Path(conf.get("prior.job.out"));
// iteration = Integer.parseInt(conf.get("conf.iteration"));
String iter = conf.get("conf.iteration");
iteration = Integer.parseInt(iter);
try {
FileSystem fs = FileSystem.get(conf);
FileStatus[] status = fs.listStatus(Cand);
for (int i = 0; i < status.length; i++) {
BufferedReader br = new BufferedReader(
new InputStreamReader(fs.open(status[i].getPath())));
String line;
line = br.readLine();
while (line != null) {
System.out.println(line);
Vec.add(line);
line = br.readLine();
}
}
} catch (Exception e) {
System.out.println("File not found");
}
Gen = GenerateCandidate(Vec, iteration);
}
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// something with CandGen
}
}
}
任何有此经验的人?
你是什么意思你的设置功能不起作用?发生什么事?此外,您不应该尝试避免分布式缓存,因为它看起来不太好。它适用于我们大多数人。 –
感谢您的快速回复。不工作的设置功能意味着存储到“Vec”中的值在集群中没有任何意义,尽管它在PC上运行良好(我使用相同的程序和输入做了简单的测试)。 –
此外,当使用分布式缓存时,如果我通过命令输入缓存路径,此应用程序将起作用。这意味着修改程序中的路径会导致我不想要的结果。 –