我尝试了很多,但没有理解为什么我的mapper记录输出= 0。我希望我的映射器不止一次地读取这些行,因为我正在处理大数据,并且需要每行上的数据不止一次,所以我首先尝试使用包含以下内容的小文件(graph.txt): -为什么地图记录输出= 0,即使当我在mapper中输出时
1,2,4,6
2,10,3,7
3,6,5,8
4,7,7,9
5,13,9,9
但是,由于映射器逐行处理文件,因此没有其他方法,那么当map()方法先被调用(n-1)次,然后在最后一个映射中执行处理时,我首先将所有值存储在文件中)方法调用。 对于文件中的每一行,我将它的数据存储在一个行数组中。在最后的map方法中,通过output.collect()函数调用输出。 另外我使用setup()方法计算no。因为每个映射器都会调用一次setup()。这里由于输入文件很小,因此只有一个映射器被称为。
我被困在这一段时间,我是新来的,请给一些解决方案。 在此先感谢。 这是代码。
驱动程序代码 -
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class primdriver {
public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(primdriver.class);
conf.setJobName("primdriver");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(primmapper.class);
//conf.setCombinerClass(Reduce.class);
conf.setReducerClass(primreducer.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
映射代码 -
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Mapper.Context;
public class primmapper extends MapReduceBase implements
Mapper<LongWritable, Text, Text, Text> {
//private final static IntWritable one = new IntWritable(1);
//private Text word = new Text();
private int no_line=0;
private int i=0;
public void setup(Context context) throws IOException{
Path pt=new Path("hdfs:/myinput/graph.txt");//Location of file
in HDFS
FileSystem fs = FileSystem.get(new Configuration());
BufferedReader br=new BufferedReader(new
InputStreamReader(fs.open(pt)));
String line;
line=br.readLine();
while (line != null){
no_line=no_line+1;
line=br.readLine();
}
}
private String [][]row=new String[no_line][4];
@Override
public void map(LongWritable key, Text value, OutputCollector<Text,
Text> output, Reporter reporter) throws IOException {
if (i<no_line-1){
String[] s = value.toString().split(",");
for (int j=0;j<s.length;j++){
row[i][j]=(s[j]);
}
i=i+1;
}
else{
String[] s = value.toString().split(",");
for (int j=0;j<s.length;j++){
//row[i][j]=Integer.parseInt(s[j]);
}
for (int i=0;i<no_line-1;i++){
String a=row[i][0];
String b=row[i][1]+","+row[i][2]+","+row[i][3];
output.collect(new Text(a),new Text(b));
}
}
}
}
减速器代号 -
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class primreducer extends MapReduceBase implements
Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter) throws IOException
{
int a = 0, b = 0 , c = 0;
output.collect(new Text("kishan "), new Text("sharma"));
while (values.hasNext()) {
String val[]=(values.next().toString()).split(",");
a=Integer.parseInt(val[0]);
b=Integer.parseInt(val[1]);
c=Integer.parseInt(val[2]);
}
output.collect(key, new Text(a+","+b+","+c));
}
}
在控制台我得到这个记录 -
[[email protected] workspace]$ hadoop jar hierarchical.jar primdriver
myinput/graph.txt cluster5
17/04/07 10:21:18 WARN mapred.JobClient: Use GenericOptionsParser for
parsing the arguments. Applications should implement Tool for the same.
17/04/07 10:21:18 WARN snappy.LoadSnappy: Snappy native library is available
17/04/07 10:21:18 INFO snappy.LoadSnappy: Snappy native library loaded
17/04/07 10:21:18 INFO mapred.FileInputFormat: Total input paths to process : 1
17/04/07 10:21:18 INFO mapred.JobClient: Running job: job_201704070816_0007
17/04/07 10:21:19 INFO mapred.JobClient: map 0% reduce 0%
17/04/07 10:22:21 INFO mapred.JobClient: map 100% reduce 0%
17/04/07 10:22:29 INFO mapred.JobClient: map 100% reduce 66%
17/04/07 10:22:53 INFO mapred.JobClient: map 100% reduce 100%
17/04/07 10:23:22 INFO mapred.JobClient: Job complete: job_201704070816_0007
17/04/07 10:23:22 INFO mapred.JobClient: Counters: 33
17/04/07 10:23:22 INFO mapred.JobClient: File System Counters
17/04/07 10:23:22 INFO mapred.JobClient: FILE: Number of bytes read=6
17/04/07 10:23:22 INFO mapred.JobClient: FILE: Number of bytes written=361924
17/04/07 10:23:22 INFO mapred.JobClient: FILE: Number of read operations=0
17/04/07 10:23:22 INFO mapred.JobClient: FILE: Number of large read operations=0
17/04/07 10:23:22 INFO mapred.JobClient: FILE: Number of write operations=0
17/04/07 10:23:22 INFO mapred.JobClient: HDFS: Number of bytes read=146
17/04/07 10:23:22 INFO mapred.JobClient: HDFS: Number of bytes written=0
17/04/07 10:23:22 INFO mapred.JobClient: HDFS: Number of read operations=3
17/04/07 10:23:22 INFO mapred.JobClient: HDFS: Number of large read operations=0
17/04/07 10:23:22 INFO mapred.JobClient: HDFS: Number of write operations=2
17/04/07 10:23:22 INFO mapred.JobClient: Job Counters
17/04/07 10:23:22 INFO mapred.JobClient: Launched map tasks=1
17/04/07 10:23:22 INFO mapred.JobClient: Launched reduce tasks=1
17/04/07 10:23:22 INFO mapred.JobClient: Data-local map tasks=1
17/04/07 10:23:22 INFO mapred.JobClient: Total time spent by all maps in occupied slots (ms)=90240
17/04/07 10:23:22 INFO mapred.JobClient: Total time spent by all reduces in occupied slots (ms)=31777
17/04/07 10:23:22 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
17/04/07 10:23:22 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
17/04/07 10:23:22 INFO mapred.JobClient: Map-Reduce Framework
17/04/07 10:23:22 INFO mapred.JobClient: Map input records=5
17/04/07 10:23:22 INFO mapred.JobClient: Map output records=0
17/04/07 10:23:22 INFO mapred.JobClient: Map output bytes=0
17/04/07 10:23:22 INFO mapred.JobClient: Input split bytes=104
17/04/07 10:23:22 INFO mapred.JobClient: Combine input records=0
17/04/07 10:23:22 INFO mapred.JobClient: Combine output records=0
17/04/07 10:23:22 INFO mapred.JobClient: Reduce input groups=0
17/04/07 10:23:22 INFO mapred.JobClient: Reduce shuffle bytes=6
17/04/07 10:23:22 INFO mapred.JobClient: Reduce input records=0
17/04/07 10:23:22 INFO mapred.JobClient: Reduce output records=0
17/04/07 10:23:22 INFO mapred.JobClient: Spilled Records=0
17/04/07 10:23:22 INFO mapred.JobClient: CPU time spent (ms)=1240
17/04/07 10:23:22 INFO mapred.JobClient: Physical memory (bytes) snapshot=196472832
17/04/07 10:23:22 INFO mapred.JobClient: Virtual memory (bytes) snapshot=775897088
17/04/07 10:23:22 INFO mapred.JobClient: Total committed heap usage (bytes)=177016832
17/04/07 10:23:22 INFO mapred.JobClient: org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter
17/04/07 10:23:22 INFO mapred.JobClient: BYTES_READ=42
我已经澄清了现在的问题。我希望现在很清楚,请告诉我这有什么问题。 –
我已经找到了问题。实际上,在调用setup()方法后,i和no_line的值都为零。所以它从不执行if条件。这就是为什么没有输出来自映射器。你能告诉我为什么设置不起作用。 –
@KISHANSHARMA因为在这个路径中没有这样的文件(检查记录器错误信息)或者这个文件是空的。这并不意味着if语句没有被检查。这意味着它被检查,但流程转到else条件,然后什么都不做,因为no_line是0 – vefthym