2016-05-29 85 views
1

嗨请在下面找到我的代码,这是抛出异常。映射减少程序抛出异常IOException“类型不匹配键映射”

package HadoopMapReduce; 

import java.io.IOException; 
import java.util.Iterator; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 

public class HospitalsMapReduce { 

    public static class TokenizerMapper 
      extends Mapper<Text, Text, Text, Text> { 

     private final static IntWritable one = new IntWritable(1); 
     private Text word = new Text(); 
     private Text val = new Text(); 

     public void map(Text key, Text value, Reducer.Context context) throws IOException, InterruptedException { 
      System.out.println("This is Value " + value); 

      String rec[] = value.toString().split(","); 
      String disease=rec[0]; 
      String name = rec[1]; 
      String loc = rec[2]; 
      int budget = Integer.parseInt(rec[3]); 
      int rating = Integer.parseInt(rec[4]); 
      String val1=1+","+name+","+budget+","+rating; 



      if (loc.equalsIgnoreCase("Pune")) { 
       word.set(disease); 
       val.set(val1); 
       context.write(word, val); 
      } 

     } 
    } 

    public static class IntSumReducer 
      extends Reducer<Text, Text, Text, Text> { 

     private Text result = new Text(); 

     public void reduce(Text key, Iterator<Text> values, 
       Reducer.Context context 
     ) throws IOException, InterruptedException { 


      int sum = 0; 
      int budget=0; 
      float avgBudget=0; 
      while(values.hasNext()) 
      { 
        String value[]=values.next().toString().split(","); 
        sum=sum+Integer.parseInt(value[0]); 
        budget=budget+ Integer.parseInt(value[2]);     
      } 

      avgBudget=budget/sum; 

      result.set(sum+" "+avgBudget); 
      context.write(key, result); 
     } 
    } 

    public static void main(String[] args) throws Exception { 
     Configuration conf = new Configuration(); 
     conf.set("fs.defaultFS", "hdfs://127.0.0.1:9000"); 
     FileSystem hdfs = FileSystem.get(conf); 
     Path output = new Path("/test/output2/"); 
     if (hdfs.exists(output)) { 
      hdfs.delete(output, true); 
     } 
     Job job = Job.getInstance(conf, "Hospital count"); 
     job.setJarByClass(HospitalCount.class); 
     job.setMapperClass(TokenizerMapper.class); 
     job.setCombinerClass(IntSumReducer.class); 
     job.setReducerClass(IntSumReducer.class); 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(Text.class); 

     LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); 
     MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class, Text.class,Text.class); 

     FileInputFormat.addInputPath(job, new Path("/test/hospital")); 
     FileOutputFormat.setOutputPath(job, output); 
     System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
} 

这里是我的错误日志:

SLF4J: Class path contains multiple SLF4J bindings. 
SLF4J: Found binding in [jar:file:/usr/local/NetBeansProjects/BDGRUSDML/Libs/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] 
SLF4J: Found binding in [jar:file:/usr/local/NetBeansProjects/BDGRUSDML/Libs/slf4j-nop-1.7.12.jar!/org/slf4j/impl/StaticLoggerBinder.class] 
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 
2016-05-29 11:50:41,302 WARN util.NativeCodeLoader (NativeCodeLoader.java:<clinit>(62)) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
2016-05-29 11:50:41,965 INFO Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(1173)) - session.id is deprecated. Instead, use dfs.metrics.session-id 
2016-05-29 11:50:41,965 INFO jvm.JvmMetrics (JvmMetrics.java:init(76)) - Initializing JVM Metrics with processName=JobTracker, sessionId= 
2016-05-29 11:50:42,024 WARN mapreduce.JobResourceUploader (JobResourceUploader.java:uploadFiles(64)) - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 
2016-05-29 11:50:42,046 WARN mapreduce.JobResourceUploader (JobResourceUploader.java:uploadFiles(171)) - No job jar file set. User classes may not be found. See Job or Job#setJar(String). 
2016-05-29 11:50:42,093 INFO input.FileInputFormat (FileInputFormat.java:listStatus(283)) - Total input paths to process : 1 
2016-05-29 11:50:42,148 INFO mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(198)) - number of splits:1 
2016-05-29 11:50:42,255 INFO mapreduce.JobSubmitter (JobSubmitter.java:printTokens(287)) - Submitting tokens for job: job_local527592655_0001 
2016-05-29 11:50:42,439 INFO mapreduce.Job (Job.java:submit(1294)) - The url to track the job: http://localhost:8080/ 
2016-05-29 11:50:42,440 INFO mapreduce.Job (Job.java:monitorAndPrintJob(1339)) - Running job: job_local527592655_0001 
2016-05-29 11:50:42,441 INFO mapred.LocalJobRunner (LocalJobRunner.java:createOutputCommitter(471)) - OutputCommitter set in config null 
2016-05-29 11:50:42,450 INFO output.FileOutputCommitter (FileOutputCommitter.java:<init>(100)) - File Output Committer Algorithm version is 1 
2016-05-29 11:50:42,455 INFO mapred.LocalJobRunner (LocalJobRunner.java:createOutputCommitter(489)) - OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 
2016-05-29 11:50:42,537 INFO mapred.LocalJobRunner (LocalJobRunner.java:runTasks(448)) - Waiting for map tasks 
2016-05-29 11:50:42,538 INFO mapred.LocalJobRunner (LocalJobRunner.java:run(224)) - Starting task: attempt_local527592655_0001_m_000000_0 
2016-05-29 11:50:42,565 INFO output.FileOutputCommitter (FileOutputCommitter.java:<init>(100)) - File Output Committer Algorithm version is 1 
2016-05-29 11:50:42,579 INFO mapred.Task (Task.java:initialize(612)) - Using ResourceCalculatorProcessTree : [ ] 
2016-05-29 11:50:42,584 INFO mapred.MapTask (MapTask.java:runNewMapper(756)) - Processing split: hdfs://127.0.0.1:9000/test/hospital/hospitals.txt:0+624 
2016-05-29 11:50:42,671 INFO mapred.MapTask (MapTask.java:setEquator(1205)) - (EQUATOR) 0 kvi 26214396(104857584) 
2016-05-29 11:50:42,672 INFO mapred.MapTask (MapTask.java:init(998)) - mapreduce.task.io.sort.mb: 100 
2016-05-29 11:50:42,672 INFO mapred.MapTask (MapTask.java:init(999)) - soft limit at 83886080 
2016-05-29 11:50:42,672 INFO mapred.MapTask (MapTask.java:init(1000)) - bufstart = 0; bufvoid = 104857600 
2016-05-29 11:50:42,672 INFO mapred.MapTask (MapTask.java:init(1001)) - kvstart = 26214396; length = 6553600 
2016-05-29 11:50:42,675 INFO mapred.MapTask (MapTask.java:createSortingCollector(403)) - Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
2016-05-29 11:50:42,733 INFO mapred.MapTask (MapTask.java:flush(1460)) - Starting flush of map output 
2016-05-29 11:50:42,747 INFO mapred.LocalJobRunner (LocalJobRunner.java:runTasks(456)) - map task executor complete. 
2016-05-29 11:50:42,760 WARN mapred.LocalJobRunner (LocalJobRunner.java:run(560)) - job_local527592655_0001 
java.lang.Exception: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.LongWritable 
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462) 
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522) 
Caused by: java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, received org.apache.hadoop.io.LongWritable 
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:1072) 
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:715) 
    at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89) 
    at org.apache.hadoop.mapreduce.lib.map.WrappedMapper$Context.write(WrappedMapper.java:112) 
    at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:125) 
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146) 
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) 
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
2016-05-29 11:50:43,444 INFO mapreduce.Job (Job.java:monitorAndPrintJob(1360)) - Job job_local527592655_0001 running in uber mode : false 
2016-05-29 11:50:43,446 INFO mapreduce.Job (Job.java:monitorAndPrintJob(1367)) - map 0% reduce 0% 
2016-05-29 11:50:43,449 INFO mapreduce.Job (Job.java:monitorAndPrintJob(1380)) - Job job_local527592655_0001 failed with state FAILED due to: NA 
2016-05-29 11:50:43,465 INFO mapreduce.Job (Job.java:monitorAndPrintJob(1385)) - Counters: 0 

编辑:


修改下面的代码后,我现在能够执行我的代码。 从Reducer.Context到上下文

public void map(Text key, Text value,Context context) throws IOException, InterruptedException 

请在下面找到更新的代码:

package HadoopMapReduce; 

import java.io.IOException; 
import java.util.Iterator; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 

public class HospitalsMapReduce { 

    public static class TokenizerMapper 
      extends Mapper<Object, Text, Text, Text> { 

     private final static IntWritable one = new IntWritable(1); 
     private Text word = new Text(); 
     private Text val = new Text(); 

     public void map(Object key, Text value, Context context) throws IOException, InterruptedException { 
      System.out.println("This is Value " + value); 

      String rec[] = value.toString().split(","); 
      String disease=rec[0]; 
      String name = rec[1]; 
      String loc = rec[2]; 
      int budget = Integer.parseInt(rec[3]); 
      int rating = Integer.parseInt(rec[4]); 
      String val1=1+","+name+","+budget+","+rating; 



      if (loc.equalsIgnoreCase("Pune")) { 
       word.set(disease); 
       val.set(val1); 
       context.write(word, val); 
      } 

     } 
    } 

    public static class IntSumReducer 
      extends Reducer<Text, Iterator<Text>, Text, Text> { 

     private Text result = new Text(); 

     public void reduce(Text key, Iterator<Text> values, 
       Context context 
     ) throws IOException, InterruptedException { 


      int sum = 0; 
      int budget=0; 
      float avgBudget=0; 

      System.out.println("This is Reducer Jobs"); 

      while(values.hasNext()) 
      { 
        String value[]=values.next().toString().split(","); 
        System.out.println("This is Value " + value); 
        sum=sum+Integer.parseInt(value[0]); 
        budget=budget+ Integer.parseInt(value[2]);     
      } 

      avgBudget=budget/sum; 

      result.set(sum+" "+avgBudget); 
      context.write(key, result); 
     } 
    } 

    public static void main(String[] args) throws Exception { 
     Configuration conf = new Configuration(); 
     conf.set("fs.defaultFS", "hdfs://127.0.0.1:9000"); 
     FileSystem hdfs = FileSystem.get(conf); 
     Path output = new Path("/test/output2/"); 
     if (hdfs.exists(output)) { 
      hdfs.delete(output, true); 
     } 
     Job job = Job.getInstance(conf, "Hospital_count"); 
     job.setJarByClass(HospitalsMapReduce.class); 
     job.setMapperClass(TokenizerMapper.class); 
     //job.setCombinerClass(IntSumReducer.class); 
     job.setReducerClass(IntSumReducer.class); 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(Text.class); 

     LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); 
     MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class, Text.class, IntWritable.class); 

     FileInputFormat.addInputPath(job, new Path("/test/hospital/")); 
     FileOutputFormat.setOutputPath(job, output); 
     System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 
} 

但现在,是没有得到执行我的问题减速功能。我的输出只显示地图功能的输出。

+0

为什么你声明multipleoutputs,如果你不使用它?此外,在为“avgBudget”划分分区之前,您应该将“预算”或“总和”进行浮动或加倍。在日志中你看到“这是减速机作业”和“这是价值......”吗? – vefthym

回答

1

使用Hadoop的可迭代就地。

如下更改您的减速机定义和代码。

public static class IntSumReducer extends Reducer<Text, Text, Text, Text> { 
private Text result = new Text(); 
public void reduce(Text key, Iterable<Text> values, Context context) 
    throws IOException, InterruptedException { 
    System.out.println("This is Red Value "); 
    int sum = 0; 
    int budget = 0; 
    float avgBudget = 0; 
    System.out.println("This is Reducer Jobs"); 
    for (Text val : values) { 
     String value[] = val.toString().split(","); 
     System.out.println("This is Reduce Value " + value); 
     sum = sum + Integer.parseInt(value[0]); 
     budget = budget + Integer.parseInt(value[2]); 
    } 
    avgBudget = budget/sum; 
    result.set(sum + " " + avgBudget); 
    context.write(key, result); 
}} 
+0

谢谢,经过几次更改后,它现在可以完美工作。 – Himanshu

0

我会总结你的问题,因为

我的关键字和值都是字符串(Text),但的Map/Reduce 框架相信我提供的数字(LongWritable

嗯,我会同意由于所有的mapper/reducer键和值都是Text,所以源代码很可能不可能做到这一点。

因此,您可能需要查看应用程序jar文件的包装 - 以查看您是否获得了正在运送到hadoop群集的正确版本。否则,看起来你的代码不能以给定的例外结束。

+0

请查看我最近更新的代码。现在我的reducer功能没有执行。 – Himanshu

0

看起来你的组合器导致了这个问题。您已将减速器功能用作组合器。但是,map函数和combiner函数的输出格式不一样,不应该发生。组合器在地图功能的输出上被调用,并且是进一步组合器操作或减少操作的输入。 Reducer期望从到达它的数据中获得与Key-Value对相同的格式,无论它是否在组合器处理后出现。

另外,从上面编写的代码中,我发现在组合函数中找到平均值并不是正确的做法。平均值将永远不会是正确的。

对于所有情况,删除组合器操作作为唯一的提升性能​​。一旦你知道你的代码在功能上运行良好,就介绍它。

+0

嗨@preeti,这不是我的实际逻辑。我只是试图用mapreduce做一些事情。 – Himanshu

+0

请检查最新的编辑和建议相同。 – Himanshu

+1

更改此行:: 公共静态类IntSumReducer 延伸减速<文本,迭代器,文本,文本> 到 改变这一行::公共静态类IntSumReducer 延伸减速<文本,文本,文本,文本> –

0

你减速的定义应该是什么样子,Java的Iterator的

public static class IntSumReducer 
      extends Reducer<Text, Text, Text, Text> { 

    public void reduce(Text key, Iterator<Text> values, 
       Context context) throws IOException, InterruptedException { 

    //your logic 
    } 

} 
相关问题