2012-10-04 55 views
1

我是Hadoop的初学者,现在我试图运行 reduce-side连接示例,但它卡住了:Map 100%并减少100%,但从未完成。进展,日志,代码,样本数据和 配置文件如下: 进展:Hadoop:减少方向连接卡住地图100%减少100%并且永远不会完成

12/10/02 15:48:06 INFO util.NativeCodeLoader: Loaded the native-hadoop library 
12/10/02 15:48:06 WARN snappy.LoadSnappy: Snappy native library not loaded 
12/10/02 15:48:06 INFO mapred.FileInputFormat: Total input paths to process : 2 
12/10/02 15:48:07 INFO mapred.JobClient: Running job: job_201210021515_0007 
12/10/02 15:48:08 INFO mapred.JobClient: map 0% reduce 0% 
12/10/02 15:48:26 INFO mapred.JobClient: map 66% reduce 0% 
12/10/02 15:48:35 INFO mapred.JobClient: map 100% reduce 0% 
12/10/02 15:48:38 INFO mapred.JobClient: map 100% reduce 22% 
12/10/02 15:48:47 INFO mapred.JobClient: map 100% reduce 100% 

Logs from Reduce task: 
2012-10-02 15:48:28,018 INFO org.apache.hadoop.mapred.Task: Using ResourceCalculatorPlugin : [email protected] 
2012-10-02 15:48:28,179 INFO org.apache.hadoop.mapred.ReduceTask: ShuffleRamManager: MemoryLimit=668126400, MaxSingleShuffleLimit=167031600 
2012-10-02 15:48:28,202 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201210021515_0007_r_000000_0 Thread started: Thread for merging on-disk files 
2012-10-02 15:48:28,202 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201210021515_0007_r_000000_0 Thread started: Thread for merging in memory files 
2012-10-02 15:48:28,203 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201210021515_0007_r_000000_0 Thread waiting: Thread for merging on-disk files 
2012-10-02 15:48:28,207 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201210021515_0007_r_000000_0 Thread started: Thread for polling Map Completion Events 
2012-10-02 15:48:28,207 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201210021515_0007_r_000000_0 Need another 3 map output(s) where 0 is already in progress 
2012-10-02 15:48:28,208 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201210021515_0007_r_000000_0 Scheduled 0 outputs (0 slow hosts and0 dup hosts) 
2012-10-02 15:48:33,209 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201210021515_0007_r_000000_0 Scheduled 1 outputs (0 slow hosts and0 dup hosts) 
2012-10-02 15:48:33,596 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201210021515_0007_r_000000_0 Scheduled 1 outputs (0 slow hosts and0 dup hosts) 
2012-10-02 15:48:38,606 INFO org.apache.hadoop.mapred.ReduceTask: attempt_201210021515_0007_r_000000_0 Scheduled 1 outputs (0 slow hosts and0 dup hosts) 
2012-10-02 15:48:39,239 INFO org.apache.hadoop.mapred.ReduceTask: GetMapEventsThread exiting 
2012-10-02 15:48:39,239 INFO org.apache.hadoop.mapred.ReduceTask: getMapsEventsThread joined. 
2012-10-02 15:48:39,241 INFO org.apache.hadoop.mapred.ReduceTask: Closed ram manager 
2012-10-02 15:48:39,242 INFO org.apache.hadoop.mapred.ReduceTask: Interleaved on-disk merge complete: 0 files left. 
2012-10-02 15:48:39,242 INFO org.apache.hadoop.mapred.ReduceTask: In-memory merge complete: 3 files left. 
2012-10-02 15:48:39,285 INFO org.apache.hadoop.mapred.Merger: Merging 3 sorted segments 
2012-10-02 15:48:39,285 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 3 segments left of total size: 10500 bytes 
2012-10-02 15:48:39,314 INFO org.apache.hadoop.mapred.ReduceTask: Merged 3 segments, 10500 bytes to disk to satisfy reduce memory limit 
2012-10-02 15:48:39,318 INFO org.apache.hadoop.mapred.ReduceTask: Merging 1 files, 10500 bytes from disk 
2012-10-02 15:48:39,319 INFO org.apache.hadoop.mapred.ReduceTask: Merging 0 segments, 0 bytes from memory into reduce 
2012-10-02 15:48:39,320 INFO org.apache.hadoop.mapred.Merger: Merging 1 sorted segments 
2012-10-02 15:48:39,322 INFO org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 10496 bytes 

Java Code: 

public class DataJoin extends Configured implements Tool { 

    public static class MapClass extends DataJoinMapperBase { 

     protected Text generateInputTag(String inputFile) {//specify tag 
      String datasource = inputFile.split("-")[0]; 
      return new Text(datasource); 
     } 

     protected Text generateGroupKey(TaggedMapOutput aRecord) {//takes a tagged record (of type TaggedMapOutput)and returns the group key for joining 
      String line = ((Text) aRecord.getData()).toString(); 
      String[] tokens = line.split(",", 2); 
      String groupKey = tokens[0]; 
      return new Text(groupKey); 
     } 

     protected TaggedMapOutput generateTaggedMapOutput(Object value) {//wraps the record value into a TaggedMapOutput type 
      TaggedWritable retv = new TaggedWritable((Text) value); 
      retv.setTag(this.inputTag);//inputTag: result of generateInputTag 
      return retv; 
     } 
    } 

    public static class Reduce extends DataJoinReducerBase { 

     protected TaggedMapOutput combine(Object[] tags, Object[] values) {//combination of the cross product of the tagged records with the same join (group) key 

      if (tags.length != 2) return null; 
      String joinedStr = ""; 
      for (int i=0; i<tags.length; i++) { 
       if (i > 0) joinedStr += ","; 
       TaggedWritable tw = (TaggedWritable) values[i]; 
       String line = ((Text) tw.getData()).toString(); 
       if (line == null) 
        return null; 
       String[] tokens = line.split(",", 2); 
       joinedStr += tokens[1]; 
      } 
      TaggedWritable retv = new TaggedWritable(new Text(joinedStr)); 
      retv.setTag((Text) tags[0]); 
      return retv; 
     } 
    } 

    public static class TaggedWritable extends TaggedMapOutput {//tagged record 

     private Writable data; 

     public TaggedWritable() { 
      this.tag = new Text(""); 
      this.data = null; 
     } 

     public TaggedWritable(Writable data) { 
      this.tag = new Text(""); 
      this.data = data; 
     } 

     public Writable getData() { 
      return data; 
     } 

     @Override 
     public void write(DataOutput out) throws IOException { 
      this.tag.write(out); 
      out.writeUTF(this.data.getClass().getName());    
      this.data.write(out); 
     } 

     @Override 
     public void readFields(DataInput in) throws IOException { 
      this.tag.readFields(in); 
       String dataClz = in.readUTF();    
       if ((this.data == null) || !this.data.getClass().getName().equals(dataClz)) { 
        try { 
        this.data = (Writable) ReflectionUtils.newInstance(Class.forName(dataClz), null); 
        System.out.printf(dataClz); 
       } catch (ClassNotFoundException e) { 
        // TODO Auto-generated catch block 
        e.printStackTrace(); 
       } 
      }    
      this.data.readFields(in); 
     } 
    } 

    public int run(String[] args) throws Exception { 
     Configuration conf = getConf(); 

     JobConf job = new JobConf(conf, DataJoin.class); 

     Path in = new Path(args[0]); 
     Path out = new Path(args[1]); 
     FileInputFormat.setInputPaths(job, in); 
     FileOutputFormat.setOutputPath(job, out); 

     job.setJobName("DataJoin"); 
     job.setMapperClass(MapClass.class); 
     job.setReducerClass(Reduce.class); 

     job.setInputFormat(TextInputFormat.class); 
     job.setOutputFormat(TextOutputFormat.class); 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(TaggedWritable.class); 
     job.set("mapred.textoutputformat.separator", ","); 
     JobClient.runJob(job); 
     return 0; 
    } 

    public static void main(String[] args) throws Exception { 
     int res = ToolRunner.run(new Configuration(), 
           new DataJoin(), 
           args); 

     System.exit(res); 
    } 
} 

的样本数据:

file 1: apat.txt(1 line) 4373932,1983,8446,1981,"NL","",16025,2,65,436,1,19,108,49,1,0.5289,0.6516,9.8571,4.1481,0.0109,0.0093,0,0 
file 2: cite.txt(100 lines) 
    4373932,3641235 
    4373932,3720760 
    4373932,3853987 
    4373932,3900558 
    4373932,3939350 
    4373932,3941876 
    4373932,3992631 
    4373932,3996345 
    4373932,3998943 
    4373932,3999948 
    4373932,4001400 
    4373932,4011219 
    4373932,4025310 
    4373932,4036946 
    4373932,4058732 
    4373932,4104029 
    4373932,4108972 
    4373932,4160016 
    4373932,4160018 
    4373932,4160019 
    4373932,4160818 
    4373932,4161515 
    4373932,4163779 
    4373932,4168146 
    4373932,4169137 
    4373932,4181650 
    4373932,4187075 
    4373932,4197361 
    4373932,4199599 
    4373932,4200436 
    4373932,4201763 
    4373932,4207075 
    4373932,4208479 
    4373932,4211766 
    4373932,4215102 
    4373932,4220450 
    4373932,4222744 
    4373932,4225783 
    4373932,4231750 
    4373932,4234563 
    4373932,4235869 
    4373932,4238195 
    4373932,4238395 
    4373932,4248854 
    4373932,4251514 
    4373932,4258130 
    4373932,4248965 
    4373932,4252783 
    4373932,4254097 
    4373932,4259313 
    4373932,4272505 
    4373932,4272506 
    4373932,4277437 
    4373932,4279992 
    4373932,4283382 
    4373932,4294817 
    4373932,4296201 
    4373932,4297273 
    4373932,4298687 
    4373932,4302534 
    4373932,4314026 
    4373932,4318707 
    4373932,4318846 
    4373932,3773625 
    4373932,3935074 
    4373932,3951748 
    4373932,3992516 
    4373932,3996344 
    4373932,3997657 
    4373932,4011308 
    4373932,4016250 
    4373932,4018884 
    4373932,4056724 
    4373932,4067959 
    4373932,4069352 
    4373932,4097586 
    4373932,4098876 
    4373932,4130462 
    4373932,4152411 
    4373932,4153675 
    4373932,4174384 
    4373932,4222743 
    4373932,4254096 
    4373932,4256834 
    4373932,4284412 
    4373932,4323647 
    4373932,3985867 
    4373932,4166105 
    4373932,4278653 
    4373932,4194877 
    4373932,4202815 
    4373932,4286959 
    4373932,4302536 
    4373932,4020151 
    4373932,4115535 
    4373932,4152412 
    4373932,4177253 
    4373932,4223002 
    4373932,4225485 
    4373932,4261968 

配置:

core-site.xml 
<!-- In: conf/core-site.xml --> 
<property> 
<name>hadoop.tmp.dir</name> 
<value>/your/path/to/hadoop/tmp/dir/hadoop-${user.name}</value> 
<description>A base for other temporary directories.</description> 
</property> 
<property> 
<name>fs.default.name</name> 
<value>hdfs://localhost:54310</value> 
<description>The name of the default file system. A URI whose 
scheme and authority determine the FileSystem implementation. The 
uri's scheme determines the config property (fs.SCHEME.impl) naming 
the FileSystem implementation class. The uri's authority is used to 
determine the host, port, etc. for a filesystem.</description> 
</property> 

mapred-site.xml 
<!-- In: conf/mapred-site.xml --> 
<property> 
<name>mapred.job.tracker</name> 
<value>localhost:54311</value> 
<description>The host and port that the MapReduce job tracker runs 
at. If "local", then jobs are run in-process as a single map 
and reduce task. 
</description> 
</property> 

hdfs-site.xml 
<!-- In: conf/hdfs-site.xml --> 
<property> 
<name>dfs.replication</name> 
<value>1</value> 
<description>Default block replication. 
The actual number of replications can be specified when the file is created. 
The default is used if replication is not specified in create time. 
</description> 
</property> 

我GOOGLE了回答并对(mapred/core/hdps)-site.xml文件中的代码或某些配置进行了一些更改,但我丢失了。我以伪模式运行此代码。来自两个文件的连接密钥是等效的。如果我将cite.txt文件更改为99行或更少,它从100行或更高版本运行良好,它会像显示的日志一样卡住。请帮我弄清楚这个问题。我感谢你的解释。

最好的问候, 海龙

回答

0

请检查您减少类。 我面临类似的问题,结果是一个非常愚蠢的错误。也许这将帮助你解决这个问题:

while (values.hasNext()) { 
    String val = values.next().toString(); 
     ..... 
} 

您需要添加:.next