2017-04-07 54 views
-3

我尝试了很多,但没有理解为什么我的mapper记录输出= 0。我希望我的映射器不止一次地读取这些行,因为我正在处理大数据,并且需要每行上的数据不止一次,所以我首先尝试使用包含以下内容的小文件(graph.txt): -为什么地图记录输出= 0,即使当我在mapper中输出时

1,2,4,6 
    2,10,3,7 
    3,6,5,8 
    4,7,7,9 
    5,13,9,9 

但是,由于映射器逐行处理文件,因此没有其他方法,那么当map()方法先被调用(n-1)次,然后在最后一个映射中执行处理时,我首先将所有值存储在文件中)方法调用。 对于文件中的每一行,我将它的数据存储在一个行数组中。在最后的map方法中,通过output.collect()函数调用输出。 另外我使用setup()方法计算no。因为每个映射器都会调用一次setup()。这里由于输入文件很小,因此只有一个映射器被称为

我被困在这一段时间,我是新来的,请给一些解决方案。 在此先感谢。 这是代码。

驱动程序代码 -

import org.apache.hadoop.fs.Path; 
    import org.apache.hadoop.io.Text; 
    import org.apache.hadoop.mapred.FileInputFormat; 
    import org.apache.hadoop.mapred.FileOutputFormat; 
    import org.apache.hadoop.mapred.JobClient; 
    import org.apache.hadoop.mapred.JobConf; 
    import org.apache.hadoop.mapred.TextInputFormat; 
    import org.apache.hadoop.mapred.TextOutputFormat; 


    public class primdriver { 
    public static void main(String[] args) throws Exception { 
     JobConf conf = new JobConf(primdriver.class); 
     conf.setJobName("primdriver"); 

     conf.setOutputKeyClass(Text.class); 
     conf.setOutputValueClass(Text.class); 

     conf.setMapperClass(primmapper.class); 
     //conf.setCombinerClass(Reduce.class); 
     conf.setReducerClass(primreducer.class); 

     conf.setInputFormat(TextInputFormat.class); 
     conf.setOutputFormat(TextOutputFormat.class); 

     FileInputFormat.setInputPaths(conf, new Path(args[0])); 
     FileOutputFormat.setOutputPath(conf, new Path(args[1])); 

     JobClient.runJob(conf); 
     } 
} 

映射代码 -

import java.io.BufferedReader; 
    import java.io.IOException; 
    import java.io.InputStreamReader; 

    import org.apache.hadoop.conf.Configuration; 
    import org.apache.hadoop.fs.FileSystem; 
    import org.apache.hadoop.fs.Path; 
    import org.apache.hadoop.io.LongWritable; 
    import org.apache.hadoop.io.Text; 
    import org.apache.hadoop.mapred.MapReduceBase; 
    import org.apache.hadoop.mapred.Mapper; 
    import org.apache.hadoop.mapred.OutputCollector; 
    import org.apache.hadoop.mapred.Reporter; 
    import org.apache.hadoop.mapreduce.Mapper.Context; 

    public class primmapper extends MapReduceBase implements   
    Mapper<LongWritable, Text, Text, Text> { 
     //private final static IntWritable one = new IntWritable(1); 
     //private Text word = new Text(); 
     private int no_line=0; 
     private int i=0; 
     public void setup(Context context) throws IOException{ 
      Path pt=new Path("hdfs:/myinput/graph.txt");//Location of file   
      in HDFS 
      FileSystem fs = FileSystem.get(new Configuration()); 
      BufferedReader br=new BufferedReader(new     
      InputStreamReader(fs.open(pt))); 
      String line; 
      line=br.readLine(); 
      while (line != null){ 
       no_line=no_line+1; 
       line=br.readLine(); 
      } 
     } 
     private String [][]row=new String[no_line][4]; 

     @Override  
     public void map(LongWritable key, Text value, OutputCollector<Text, 
    Text> output, Reporter reporter) throws IOException {   

      if (i<no_line-1){ 

       String[] s = value.toString().split(","); 
       for (int j=0;j<s.length;j++){ 
        row[i][j]=(s[j]); 
       } 
       i=i+1; 
      } 
      else{ 
       String[] s = value.toString().split(","); 
       for (int j=0;j<s.length;j++){ 
      //row[i][j]=Integer.parseInt(s[j]); 
       } 
       for (int i=0;i<no_line-1;i++){ 
        String a=row[i][0]; 
        String b=row[i][1]+","+row[i][2]+","+row[i][3]; 
        output.collect(new Text(a),new Text(b)); 
       } 
      } 
     } 
    } 

减速器代号 -

import java.io.IOException; 
    import java.util.Iterator; 

    import org.apache.hadoop.io.Text; 
    import org.apache.hadoop.mapred.MapReduceBase; 
    import org.apache.hadoop.mapred.OutputCollector; 
    import org.apache.hadoop.mapred.Reducer; 
    import org.apache.hadoop.mapred.Reporter; 


     public class primreducer extends MapReduceBase implements 
     Reducer<Text, Text, Text, Text> { 
      public void reduce(Text key, Iterator<Text> values, 
    OutputCollector<Text, Text> output, Reporter reporter) throws IOException   
    { 
     int a = 0, b = 0 , c = 0; 
     output.collect(new Text("kishan "), new Text("sharma")); 
     while (values.hasNext()) { 
      String val[]=(values.next().toString()).split(","); 
      a=Integer.parseInt(val[0]); 
      b=Integer.parseInt(val[1]); 
      c=Integer.parseInt(val[2]); 
     } 
     output.collect(key, new Text(a+","+b+","+c)); 
    } 
} 

在控制台我得到这个记录 -

[[email protected] workspace]$ hadoop jar hierarchical.jar primdriver 
    myinput/graph.txt cluster5 
    17/04/07 10:21:18 WARN mapred.JobClient: Use GenericOptionsParser for   
    parsing the arguments. Applications should implement Tool for the same. 
    17/04/07 10:21:18 WARN snappy.LoadSnappy: Snappy native library is available 
    17/04/07 10:21:18 INFO snappy.LoadSnappy: Snappy native library loaded 
    17/04/07 10:21:18 INFO mapred.FileInputFormat: Total input paths to process : 1 
    17/04/07 10:21:18 INFO mapred.JobClient: Running job: job_201704070816_0007 
    17/04/07 10:21:19 INFO mapred.JobClient: map 0% reduce 0% 
    17/04/07 10:22:21 INFO mapred.JobClient: map 100% reduce 0% 
    17/04/07 10:22:29 INFO mapred.JobClient: map 100% reduce 66% 
    17/04/07 10:22:53 INFO mapred.JobClient: map 100% reduce 100% 
    17/04/07 10:23:22 INFO mapred.JobClient: Job complete: job_201704070816_0007 
    17/04/07 10:23:22 INFO mapred.JobClient: Counters: 33 
    17/04/07 10:23:22 INFO mapred.JobClient: File System Counters 
    17/04/07 10:23:22 INFO mapred.JobClient:  FILE: Number of bytes read=6 
    17/04/07 10:23:22 INFO mapred.JobClient:  FILE: Number of bytes written=361924 
    17/04/07 10:23:22 INFO mapred.JobClient:  FILE: Number of read operations=0 
    17/04/07 10:23:22 INFO mapred.JobClient:  FILE: Number of large read operations=0 
    17/04/07 10:23:22 INFO mapred.JobClient:  FILE: Number of write operations=0 
    17/04/07 10:23:22 INFO mapred.JobClient:  HDFS: Number of bytes read=146 
    17/04/07 10:23:22 INFO mapred.JobClient:  HDFS: Number of bytes written=0 
    17/04/07 10:23:22 INFO mapred.JobClient:  HDFS: Number of read operations=3 
    17/04/07 10:23:22 INFO mapred.JobClient:  HDFS: Number of large read operations=0 
    17/04/07 10:23:22 INFO mapred.JobClient:  HDFS: Number of write operations=2 
    17/04/07 10:23:22 INFO mapred.JobClient: Job Counters 
    17/04/07 10:23:22 INFO mapred.JobClient:  Launched map tasks=1 
    17/04/07 10:23:22 INFO mapred.JobClient:  Launched reduce tasks=1 
    17/04/07 10:23:22 INFO mapred.JobClient:  Data-local map tasks=1 
    17/04/07 10:23:22 INFO mapred.JobClient:  Total time spent by all maps in occupied slots (ms)=90240 
    17/04/07 10:23:22 INFO mapred.JobClient:  Total time spent by all reduces in occupied slots (ms)=31777 
    17/04/07 10:23:22 INFO mapred.JobClient:  Total time spent by all maps waiting after reserving slots (ms)=0 
    17/04/07 10:23:22 INFO mapred.JobClient:  Total time spent by all reduces waiting after reserving slots (ms)=0 
    17/04/07 10:23:22 INFO mapred.JobClient: Map-Reduce Framework 
    17/04/07 10:23:22 INFO mapred.JobClient:  Map input records=5 
    17/04/07 10:23:22 INFO mapred.JobClient:  Map output records=0 
    17/04/07 10:23:22 INFO mapred.JobClient:  Map output bytes=0 
    17/04/07 10:23:22 INFO mapred.JobClient:  Input split bytes=104 
    17/04/07 10:23:22 INFO mapred.JobClient:  Combine input records=0 
    17/04/07 10:23:22 INFO mapred.JobClient:  Combine output records=0 
    17/04/07 10:23:22 INFO mapred.JobClient:  Reduce input groups=0 
    17/04/07 10:23:22 INFO mapred.JobClient:  Reduce shuffle bytes=6 
    17/04/07 10:23:22 INFO mapred.JobClient:  Reduce input records=0 
    17/04/07 10:23:22 INFO mapred.JobClient:  Reduce output records=0 
    17/04/07 10:23:22 INFO mapred.JobClient:  Spilled Records=0 
    17/04/07 10:23:22 INFO mapred.JobClient:  CPU time spent (ms)=1240 
    17/04/07 10:23:22 INFO mapred.JobClient:  Physical memory (bytes) snapshot=196472832 
    17/04/07 10:23:22 INFO mapred.JobClient:  Virtual memory (bytes) snapshot=775897088 
    17/04/07 10:23:22 INFO mapred.JobClient:  Total committed heap usage (bytes)=177016832 
    17/04/07 10:23:22 INFO mapred.JobClient: org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter 
    17/04/07 10:23:22 INFO mapred.JobClient:  BYTES_READ=42 

回答

0

ino_line-1小,你不收任何东西。这种情况总是适用于你的情况,这就是为什么你没有看到任何地图输出记录。

当您开始处理第一条记录时,no_line已经初始化为其最终值(输入文件“hdfs:/myinput/graph.txt”中的实际行数)。

此时,i是0。然后,当这一点,如果条件满足,i变为1 在该特定映射器(不是在所有映射器)*然后,i具有值1(在该映射器)而且它必须小于no_line - 1。看来你的文件graph.txt有超过5行(我猜)。

总之,setup()在每个映射器上执行每个map()之前执行一次。

我不知道你想做什么,从这部分看起来很难理解。如果您需要更多帮助,请尝试使其更清楚并更新您的问题并提供更多详细信息。在else语句中,再次使用变量i似乎很混乱,因为不清楚您是否真的想要使用本地i或“shadowed”i。你的IDE没有给出警告吗?

*这是一个非常糟糕的做法,因为您不知道哪个值i将在每个映射器中使用,这取决于数据分区。

+0

我已经澄清了现在的问题。我希望现在很清楚,请告诉我这有什么问题。 –

+0

我已经找到了问题。实际上,在调用setup()方法后,i和no_line的值都为零。所以它从不执行if条件。这就是为什么没有输出来自映射器。你能告诉我为什么设置不起作用。 –

+0

@KISHANSHARMA因为在这个路径中没有这样的文件(检查记录器错误信息)或者这个文件是空的。这并不意味着if语句没有被检查。这意味着它被检查,但流程转到else条件,然后什么都不做,因为no_line是0 – vefthym

相关问题