2015-04-16 54 views
0

以下是我使用自定义可写类比实现简单MapReduce作业的代码。Reduce未启动,地图完成后

public class MapReduceKMeans { 

public static class MapReduceKMeansMapper extends 
     Mapper<Object, Text, SongDataPoint, Text> { 
    public void map(Object key, Text value, Context context) 
      throws InterruptedException, IOException { 
     String str = value.toString(); 
     // Reading Line one by one from the input CSV. 
     String split[] = str.split(","); 

     String trackId = split[0]; 
     String title = split[1]; 
     String artistName = split[2]; 
     SongDataPoint songDataPoint = 
       new SongDataPoint(new Text(trackId), new Text(title), 
         new Text(artistName)); 
     context.write(songDataPoint, new Text()); 
     } 
    } 


public static class MapReduceKMeansReducer extends 
Reducer<SongDataPoint, Text, Text, NullWritable> { 
    public void reduce(SongDataPoint key, Iterable<Text> values, 
      Context context) throws IOException, InterruptedException { 
     StringBuilder sb = new StringBuilder(); 
     sb.append(key.getTrackId()).append("\t"). 
     append(key.getTitle()).append("\t") 
     .append(key.getArtistName()).append("\t"); 

     String write = sb.toString(); 

     context.write(new Text(write), NullWritable.get()); 
    } 

} 

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 
    Configuration conf = new Configuration(); 
    String[] otherArgs = new GenericOptionsParser(conf, args) 
      .getRemainingArgs(); 

    if (otherArgs.length != 2) { 
     System.err 
       .println("Usage:<CsV Out Path> <Final Out Path>"); 
     System.exit(2); 
    } 



    Job job = new Job(conf, "Song Data Trial"); 
    job.setJarByClass(MapReduceKMeans.class); 
    job.setMapperClass(MapReduceKMeansMapper.class); 
    job.setReducerClass(MapReduceKMeansReducer.class); 
    job.setOutputKeyClass(SongDataPoint.class); 
    job.setOutputValueClass(Text.class); 

    FileInputFormat.addInputPath(job, new Path(otherArgs[0])); 
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 
    System.exit(job.waitForCompletion(true) ? 0 : 1); 
} 
} 

当我调试我的代码时,将读取CSV文件中的所有行,但它根本不会输入reduce作业。

我也使用SongDataPoint作为我的自定义可写。

其代码如下。

public class SongDataPoint implements WritableComparable<SongDataPoint> { 

Text trackId; 
Text title; 
Text artistName; 

public SongDataPoint() { 
    this.trackId = new Text(); 
    this.title = new Text(); 
    this.artistName = new Text(); 
} 

public SongDataPoint(Text trackId, Text title, Text artistName) { 
    this.trackId = trackId; 
    this.title = title; 
    this.artistName = artistName; 
} 

@Override 
public void readFields(DataInput in) throws IOException { 
    this.trackId.readFields(in); 
    this.title.readFields(in); 
    this.artistName.readFields(in); 
} 

@Override 
public void write(DataOutput out) throws IOException { 

} 

public Text getTrackId() { 
    return trackId; 
} 

public void setTrackId(Text trackId) { 
    this.trackId = trackId; 
} 

public Text getTitle() { 
    return title; 
} 

public void setTitle(Text title) { 
    this.title = title; 
} 

public Text getArtistName() { 
    return artistName; 
} 

public void setArtistName(Text artistName) { 
    this.artistName = artistName; 
} 


@Override 
public int compareTo(SongDataPoint o) { 
    // TODO Auto-generated method stub 
    int compare = getTrackId().compareTo(o.getTrackId()); 
    return compare; 
} 

} 

任何帮助表示赞赏。谢谢。

回答

0

你的输出键类类按驱动程序是SongDataPoint.class,产值类作为Text.class但实际上你正在写的文本在减速和Nullwritable关键在减速值。

0

您还应该指定Mapper输出值如下。

job.setMapOutputKeyClass(SongDataPoint.class); 
job.setMapOutputValueClass(Text.class); 
0

我的CustomWritable类中的写入方法被错误地留空。它在写入正确的代码后解决了问题。

public void write(DataOutput out) throws IOException { 

}