2013-08-17 20 views
0

我是hadoop的新手,我试图获得员工的最高工资。我的数据看起来像获取java.lang.RuntimeException:“前进post rec#0”

1231,"","","",4000<br/> 
    1232,"","","",5000<br/> 
    ..................<br/> 
    ..................<br/> 

这是我的映射类,在这里,我要试图发出完整的元组

public class maxmap extends MapReduceBase implements Mapper<LongWritable,Text,Text,employee> { 


    @Override 
    public void map(LongWritable key, Text value, 
      OutputCollector<Text,employee> outputCollector, Reporter reporter) 
      throws IOException { 
     // TODO Auto-generated method stub 
     //TupleWritable tupleWritable= new TupleWritable(new Writable[]{new Text(value.toString().split(",")[1]), 
       //new Text(value.toString().split(",")[0]) 
     //});Str 

     String employeeId = value.toString().split(",")[0]; 

     int count =1231; 
     employee employee=new employee(Integer.parseInt(employeeId), "employeeName", "StemployeeDept", "employeeJoinDt",1231); 
     //tupleWritable.write(); 
     outputCollector.collect(new Text("max salry"),employee); 

    } 
} 

这是我减速类

public class maxsalreduce extends MapReduceBase implements Reducer<Text,employee,Text,IntWritable> { 

    @Override 
    public void reduce(Text key, Iterator<employee> values, 
      OutputCollector<Text, IntWritable> collector, Reporter reporter) 
      throws IOException { 
     // TODO Auto-generated method stub 
     System.out.println("in reducer"); 
     while(values.hasNext()){ 
      employee employee=values.next(); 
      System.out.println("employee id"+employee.employeeId); 

     } 
    collector.collect(new Text(""), new IntWritable(1)); 

    } 
} 

这是我的员工类

public class employee implements Writable{ 

    public int employeeId; 
    private String employeeName; 
    private String employeeDept; 
    private String employeeJoinDt; 
    public employee(int employeeId,String employeeName,String employeeDept,String employeeJoinDt,int employeeSalary){ 
     this.employeeId=employeeId; 
     System.out.println(this.employeeId); 
     this.employeeName=employeeName; 
     this.employeeDept=employeeDept; 
     this.employeeJoinDt=employeeJoinDt; 
     this.employeeSalary=employeeSalary; 
    } 
    public employee() { 
     // TODO Auto-generated constructor stub 
    } 
    public int getEmployeeId() { 
     return employeeId; 
    } 

    public void setEmployeeId(int employeeId) { 
     this.employeeId = employeeId; 
    } 

    public String getEmployeeName() { 
     return employeeName; 
    } 

    public void setEmployeeName(String employeeName) { 
     this.employeeName = employeeName; 
    } 

    public String getEmployeeDept() { 
     return employeeDept; 
    } 

    public void setEmployeeDept(String employeeDept) { 
     this.employeeDept = employeeDept; 
    } 

    public String getEmployeeJoinDt() { 
     return employeeJoinDt; 
    } 

    public void setEmployeeJoinDt(String employeeJoinDt) { 
     this.employeeJoinDt = employeeJoinDt; 
    } 

    public int getEmployeeSalary() { 
     return employeeSalary; 
    } 

    public void setEmployeeSalary(int employeeSalary) { 
     this.employeeSalary = employeeSalary; 
    } 

    private int employeeSalary; 
    @Override 
    public void readFields(DataInput input) throws IOException { 
     // TODO Auto-generated method stubt 
     System.out.println("employee id is"+input.readInt()); 
     //this.employeeId=input.readInt(); 
     //this.employeeName=input.readUTF(); 
     //this.employeeDept=input.readUTF(); 
     //this.employeeJoinDt=input.readUTF();mployee id 
     //this.employeeSalary=input.readInt(); 
     new employee(input.readInt(),input.readUTF(),input.readUTF(),input.readUTF(),input.readInt()); 
    } 

    @Override 
    public void write(DataOutput output) throws IOException { 
     // TODO Auto-generated method stub 
     output.writeInt(this.employeeId); 
     output.writeUTF(this.employeeName); 
     output.writeUTF(this.employeeDept); 
     output.writeUTF(this.employeeJoinDt); 
     output.writeInt(this.employeeSalary); 

    } 
} 

这是我的工作亚军

public class jobrunner { 

    public static void main(String[] args) throws IOException 
    { 


     JobConf jobConf = new JobConf(jobrunner.class); 
     jobConf.setJobName("Count no of employees"); 
     jobConf.setMapperClass(maxmap.class); 
     jobConf.setReducerClass(maxsalreduce.class); 

     FileInputFormat.setInputPaths(jobConf, new Path("hdfs://localhost:9000/employee_data.txt")); 
     FileOutputFormat.setOutputPath(jobConf,new Path("hdfs://localhost:9000/dummy20.txt")); 
     jobConf.setOutputKeyClass(Text.class); 
     jobConf.setOutputValueClass(employee.class); 
     JobClient.runJob(jobConf); 
    } 
} 

这是我得到

java.lang.RuntimeException: problem advancing post rec#0 
    at org.apache.hadoop.mapred.Task$ValuesIterator.next(Task.java:1214) 
    at org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.moveToNext(ReduceTask.java:249) 
    at org.apache.hadoop.mapred.ReduceTask$ReduceValuesIterator.next(ReduceTask.java:245) 
    at tuplewritable.maxsalreduce.reduce(maxsalreduce.java:24) 
    at tuplewritable.maxsalreduce.reduce(maxsalreduce.java:1) 
    at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:519) 
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:420) 
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:260) 
Caused by: java.io.EOFException 
    at java.io.DataInputStream.readFully(DataInputStream.java:180) 
    at java.io.DataInputStream.readUTF(DataInputStream.java:592) 
    at java.io.DataInputStream.readUTF(DataInputStream.java:547) 
    at tuplewritable.employee.readFields(employee.java:76) 
    at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:67) 
    at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:40) 
    at org.apache.hadoop.mapred.Task$ValuesIterator.readNextValue(Task.java:1271) 
    at org.apache.hadoop.mapred.Task$ValuesIterator.next(Task.java:1211) 
    ... 7 more 
13/08/17 20:44:14 INFO mapred.JobClient: map 100% reduce 0% 
13/08/17 20:44:14 INFO mapred.JobClient: Job complete: job_local_0001 
13/08/17 20:44:14 INFO mapred.JobClient: Counters: 21 
13/08/17 20:44:14 INFO mapred.JobClient: File Input Format Counters 
13/08/17 20:44:14 INFO mapred.JobClient:  Bytes Read=123 
13/08/17 20:44:14 INFO mapred.JobClient: FileSystemCounters 
13/08/17 20:44:14 INFO mapred.JobClient:  FILE_BYTES_READ=146 
13/08/17 20:44:14 INFO mapred.JobClient:  HDFS_BYTES_READ=123 
13/08/17 20:44:14 INFO mapred.JobClient:  FILE_BYTES_WRITTEN=39985 
13/08/17 20:44:14 INFO mapred.JobClient: Map-Reduce Framework 
13/08/17 20:44:14 INFO mapred.JobClient:  Map output materialized bytes=270 
13/08/17 20:44:14 INFO mapred.JobClient:  Map input records=4 
13/08/17 20:44:14 INFO mapred.JobClient:  Reduce shuffle bytes=0 
13/08/17 20:44:14 INFO mapred.JobClient:  Spilled Records=4 
13/08/17 20:44:14 INFO mapred.JobClient:  Map output bytes=256 
13/08/17 20:44:14 INFO mapred.JobClient:  Total committed heap usage (bytes)=160763904 
13/08/17 20:44:14 INFO mapred.JobClient:  CPU time spent (ms)=0 
13/08/17 20:44:14 INFO mapred.JobClient:  Map input bytes=123 
13/08/17 20:44:14 INFO mapred.JobClient:  SPLIT_RAW_BYTES=92 
13/08/17 20:44:14 INFO mapred.JobClient:  Combine input records=0 
13/08/17 20:44:14 INFO mapred.JobClient:  Reduce input records=0 
13/08/17 20:44:14 INFO mapred.JobClient:  Reduce input groups=0 
13/08/17 20:44:14 INFO mapred.JobClient:  Combine output records=0 
13/08/17 20:44:14 INFO mapred.JobClient:  Physical memory (bytes) snapshot=0 
13/08/17 20:44:14 INFO mapred.JobClient:  Reduce output records=0 
13/08/17 20:44:14 INFO mapred.JobClient:  Virtual memory (bytes) snapshot=0 
13/08/17 20:44:14 INFO mapred.JobClient:  Map output records=4 
13/08/17 20:44:14 INFO mapred.JobClient: Job Failed: NA 
Exception in thread "main" java.io.IOException: Job failed! 
    at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1265) 
    at tuplewritable.jobrunner.main(jobrunner.java:30) 
13/08/17 20:44:14 ERROR hdfs.DFSClient: Exception closing file /dummy20.txt/_temporary/_attempt_local_0001_r_000000_0/part-00000 : org.apache.hadoop.ipc.RemoteException: org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException: No lease on /dummy20.txt/_temporary/_attempt_local_0001_r_000000_0/part-00000 File does not exist. Holder DFSClient_1595916561 does not have any open files. 
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:1629) 
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:1620) 
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:1675) 
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:1663) 
    at org.apache.hadoop.hdfs.server.namenode.NameNode.complete(NameNode.java:718) 
    at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) 
    at java.lang.reflect.Method.invoke(Method.java:597) 
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:563) 
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1388) 
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1384) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1083) 
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1382) 

org.apache.hadoop.ipc.RemoteException: org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException: No lease on /dummy20.txt/_temporary/_attempt_local_0001_r_000000_0/part-00000 File does not exist. Holder DFSClient_1595916561 does not have any open files. 
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:1629) 
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:1620) 
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:1675) 
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:1663) 
    at org.apache.hadoop.hdfs.server.namenode.NameNode.complete(NameNode.java:718) 
    at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) 
    at java.lang.reflect.Method.invoke(Method.java:597) 
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:563) 
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1388) 
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:1384) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1083) 
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:1382) 

    at org.apache.hadoop.ipc.Client.call(Client.java:1066) 
    at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225) 
    at $Proxy1.complete(Unknown Source) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) 
    at java.lang.reflect.Method.invoke(Method.java:597) 
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:82) 
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:59) 
    at $Proxy1.complete(Unknown Source) 
    at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.closeInternal(DFSClient.java:3894) 
    at org.apache.hadoop.hdfs.DFSClient$DFSOutputStream.close(DFSClient.java:3809) 
    at org.apache.hadoop.hdfs.DFSClient$LeaseChecker.close(DFSClient.java:1342) 
    at org.apache.hadoop.hdfs.DFSClient.close(DFSClient.java:275) 
    at org.apache.hadoop.hdfs.DistributedFileSystem.close(DistributedFileSystem.java:328) 
    at org.apache.hadoop.fs.FileSystem$Cache.closeAll(FileSystem.java:1446) 
    at org.apache.hadoop.fs.FileSystem.closeAll(FileSystem.java:277) 
    at org.apache.hadoop.fs.FileSystem$ClientFinalizer.run(FileSystem.java:260) 

回答

0

你有这些错误会在你的类除外。

Employee类删除

System.out.println("employee id is"+input.readInt());

而且,

new employee(input.readInt(),input.readUTF(),input.readUTF(), 
    input.readUTF(),input.readInt()); 

@Override 
    public void readFields(DataInput input) throws IOException { 
     // TODO Auto-generated method stubt 
     System.out.println("employee id is"+input.readInt()); 
     //this.employeeId=input.readInt(); 
     //this.employeeName=input.readUTF(); 
     //this.employeeDept=input.readUTF(); 
     //this.employeeJoinDt=input.readUTF();mployee id 
     //this.employeeSalary=input.readInt(); 
     new employee(input.readInt(),input.readUTF(),input.readUTF(),input.readUTF(),input.readInt()); 
    } 

原因:System.out.println("employee id is"+input.readInt());已将您的第一个输入反序列化,这就是为什么再次使用input.readInt()导致此问题的原因。而另一条线new Employee(....),你可能很清楚没有像这样使用它。至少我不这样做。

接下来在JobRunner类

删除此行:

jobConf.setOutputValueClass(employee.class); 

添加这些行,

jobConf.setMapOutputKeyClass(Text.class); 
jobConf.setMapOutputValueClass(employee.class); 
jobConf.setOutputKeyClass(Text.class); 
jobConf.setOutputValueClass(IntWritable.class); 

附录:请用大写字母开始授课名称。如果你不是,它打破Java naming convention

+1

是的,它现在正在工作,谢谢你的宝贵意见 – user1585111