2013-05-01 82 views
0

我从以下Map原因及显示java.lang.NullPointerException错误修正

package org.myorg; 

import java.io.*; 
import java.util.*; 
import java.sql.*; 
import java.util.logging.Level; 
import org.apache.hadoop.fs.*; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*; 

public class ParallelTraining { 

    public static class Map extends MapReduceBase implements 
      Mapper<LongWritable, Text, Text, Text> { 
     private final static LongWritable zero = new LongWritable(0); 
     private Text word = new Text(); 
     private Text outputvalue = new Text(); 

     public void map(LongWritable key, Text value, 
       OutputCollector<Text, Text> output, Reporter reporter) 
       throws IOException { 
      Configuration conf = new Configuration(); 
      int CountComputers; 

      FileInputStream fstream = new FileInputStream(
        "/export/hadoop-1.0.1/bin/countcomputers.txt"); 

      BufferedReader br = new BufferedReader(new InputStreamReader(fstream)); 
      String result=br.readLine(); 
      CountComputers=Integer.parseInt(result); 
      input.close(); 
      fstream.close(); 

      Connection con = null; 
      Statement st = null; 
      ResultSet rs = null;  
      String url = "jdbc:postgresql://192.168.1.8:5432/NexentaSearch"; 
      String user = "postgres"; 
      String password = "valter89"; 


      ArrayList<String> files = new ArrayList<String>(); 
      ArrayList<String> therms = new ArrayList<String>(); 
      ArrayList<Integer> idoffiles = new ArrayList<Integer>(); 
      ArrayList<Integer> countthermsinfiles = new ArrayList<Integer>(); 
      String currentFile; 
      int CountFiles=0; 
      int CountThermsInFile; 
      int CountFilesOnClusterNode; 
      int RemainFilesOnOneClusterNode; 
      try 
      { 
       con = DriverManager.getConnection(url, user, password); 
         st = con.createStatement(); 
         rs = st.executeQuery("SELECT a.value, c.value, c.id FROM therms a INNER JOIN therms_occurs b ON b.therm_id = a.id INNER JOIN fs_entries c ON b.file_id = c.id ORDER BY a.value, c.value, c.id"); 
       rs.first(); 
       therms.add(rs.getString(1)); 
       CountThermsInFile=1; 
       currentFile=rs.getString(2); 
       idoffiles.add(rs.getInt(3)); 
       CountFiles=1; 
       while (rs.next()) 
       { 
        if (currentFile!=rs.getString(2)) 
         { 
          countthermsinfiles.add(CountThermsInFile); 
          CountFiles+=1; 
          currentFile=rs.getString(2); 
          CountThermsInFile=1; 
          idoffiles.add(rs.getInt(3)); 
         } 
        else 
         { 
          CountThermsInFile+=1; 
         }; 
        therms.add(rs.getString(1)); 
       }; 
       countthermsinfiles.add(CountThermsInFile); 
      } 
      catch (SQLException e) 
      { 
       System.out.println("Connection Failed! Check output console"); 
       e.printStackTrace(); 
      } 

      if (CountComputers!=2) 
      { 
       if (CountFiles%CountComputers==0) 
       { 
        CountFilesOnClusterNode=CountFiles/CountComputers; 
        RemainFilesOnOneClusterNode=0; 
       } 
       else 
       { 
        CountFilesOnClusterNode=CountFiles/(CountComputers-1); 
        RemainFilesOnOneClusterNode=CountFiles%(CountComputers-1); 
       }; 
      } 
      else 
      { 
       if (CountFiles%2==0) 
       { 
        CountFilesOnClusterNode=CountFiles/2; 
        RemainFilesOnOneClusterNode=0; 
       } 
       else 
       { 
        CountFilesOnClusterNode=CountFiles/2+1; 
        RemainFilesOnOneClusterNode=CountFiles-CountFilesOnClusterNode; 
       }; 
      }; 



      String[] ConcatPaths = new String[CountComputers]; 
      int NumberTherms = 0; 
      int currentCountOfTherms=0; 
      if (RemainFilesOnOneClusterNode==0) 
      { 
       for (int i=0; i<CountComputers; i++) 
       { 
        for (int j=0; j<CountFilesOnClusterNode; j++) 
        { 
         for (int k=0; k<countthermsinfiles.get(i*CountFilesOnClusterNode+j); k++) 
          { 
           if (!(k==(countthermsinfiles.get(i*CountFilesOnClusterNode+CountFilesOnClusterNode-1)-1))) 
           { 
            ConcatPaths[i] +=therms.get(currentCountOfTherms+k)+"\n"; 
           } 
           else 
           { 
            ConcatPaths[i] +=therms.get(currentCountOfTherms+k); 
           }; 
          } 
         currentCountOfTherms+=countthermsinfiles.get(i*CountFilesOnClusterNode+j); 
        } 
       } 
      } 
      else 
      { 
       for (int i=0; i<CountComputers-1; i++) 
       {   
        for (int j=0; j<CountFilesOnClusterNode; j++) 
        { 
         for (int k=0; k<countthermsinfiles.get(i*CountFilesOnClusterNode+j); k++) 
          { 
           if (!(k==(countthermsinfiles.get(i*CountFilesOnClusterNode+CountFilesOnClusterNode-1)-1))) 
           { 
            ConcatPaths[i] +=therms.get(currentCountOfTherms+k)+"\n"; 
           } 
           else 
           { 
            ConcatPaths[i] +=therms.get(currentCountOfTherms+k); 
           }; 
          } 
         currentCountOfTherms+=countthermsinfiles.get(i*CountFilesOnClusterNode+j); 
        }   
       }; 
       for (int j=0; j<RemainFilesOnOneClusterNode; j++) 
        { 
         for (int k=0; k<countthermsinfiles.get((CountComputers-1)*CountFilesOnClusterNode+j); k++) 
          { 
            if (!(k==(countthermsinfiles.get((CountComputers-1)*CountFilesOnClusterNode+(RemainFilesOnOneClusterNode-1))-1))) 
            { 
             ConcatPaths[CountComputers-1] +=therms.get(currentCountOfTherms+k)+"\n"; 
            } 
            else 
            { 
             ConcatPaths[CountComputers-1] +=therms.get(currentCountOfTherms+k); 
            } 
          } 
         currentCountOfTherms+=countthermsinfiles.get((CountComputers-1)*CountFilesOnClusterNode+j); 
        };    
      }; 


      String[] ConcatCountOfThermsAndIds= new String[CountComputers]; 
      if (RemainFilesOnOneClusterNode==0) 
      { 
       for (int i=0; i<CountComputers; i++) 
       { 
        for (int j=0; j<CountFilesOnClusterNode; j++) 
        { 
         ConcatCountOfThermsAndIds[i]+=Integer.toString(countthermsinfiles.get(i*CountFilesOnClusterNode+j))+" "+Integer.toString(idoffiles.get(i*CountFilesOnClusterNode+j))+" "; 
        }; 
       }; 
      } 
      else 
      { 
       for (int i=0; i<CountComputers-1; i++) 
       { 
        for (int j=0; j<CountFilesOnClusterNode; j++) 
        { 
         ConcatCountOfThermsAndIds[i]+=Integer.toString(countthermsinfiles.get(i*CountFilesOnClusterNode+j))+" "+Integer.toString(idoffiles.get(i*CountFilesOnClusterNode+j))+" "; 
        }; 
       }; 
       for (int j=0; j<RemainFilesOnOneClusterNode; j++) 
       { 
        ConcatCountOfThermsAndIds[CountComputers-1]+=Integer.toString(countthermsinfiles.get((CountComputers-1)*CountFilesOnClusterNode+j))+" "+Integer.toString(idoffiles.get((CountComputers-1)*CountFilesOnClusterNode+j))+" "; 
       }; 
      }; 

      for (int i = 0; i < ConcatPaths.length; i++) { 
       word.set(ConcatPaths[i]); 
       outputvalue.set(ConcatCountOfThermsAndIds[i]); 
       output.collect(word, outputvalue); 
      } 
     } 
    } 

写的hadoop程序而作为一个集群上执行的结果,我收到以下消息

args[0]=/export/hadoop-1.0.1/bin/input 
13/05/01 14:53:45 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 
13/05/01 14:53:45 INFO mapred.FileInputFormat: Total input paths to process : 1 
13/05/01 14:53:46 INFO mapred.JobClient: Running job: job_201304302334_0002 
13/05/01 14:53:47 INFO mapred.JobClient: map 0% reduce 0% 
13/05/01 14:58:00 INFO mapred.JobClient: map 50% reduce 0% 
13/05/01 14:58:01 INFO mapred.JobClient: Task Id : attempt_201304302334_0002_m_000000_0, Status : FAILED 
java.lang.NullPointerException 
    at org.apache.hadoop.io.Text.encode(Text.java:388) 
    at org.apache.hadoop.io.Text.set(Text.java:178) 
    at org.myorg.ParallelTraining$Map.map(ParallelTraining.java:206) 
    at org.myorg.ParallelTraining$Map.map(ParallelTraining.java:15) 
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:436) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:372) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:415) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1093) 
    at org.apache.hadoop.mapred.Child.main(Child.java:249) 

13/05/01 14:59:01 WARN mapred.JobClient: Error reading task outputmyhost3 
13/05/01 14:59:01 WARN mapred.JobClient: Error reading task outputmyhost3 
13/05/01 14:59:01 INFO mapred.JobClient: Job complete: job_201304302334_0002 
13/05/01 14:59:01 INFO mapred.JobClient: Counters: 21 
13/05/01 14:59:01 INFO mapred.JobClient: Job Counters 
13/05/01 14:59:01 INFO mapred.JobClient:  Launched reduce tasks=1 
13/05/01 14:59:01 INFO mapred.JobClient:  SLOTS_MILLIS_MAPS=39304 
13/05/01 14:59:01 INFO mapred.JobClient:  Total time spent by all reduces waiting after reserving slots (ms)=0 
13/05/01 14:59:01 INFO mapred.JobClient:  Total time spent by all maps waiting after reserving slots (ms)=0 
13/05/01 14:59:01 INFO mapred.JobClient:  Launched map tasks=5 
13/05/01 14:59:01 INFO mapred.JobClient:  Data-local map tasks=5 
13/05/01 14:59:01 INFO mapred.JobClient:  SLOTS_MILLIS_REDUCES=24216 
13/05/01 14:59:01 INFO mapred.JobClient:  Failed map tasks=1 
13/05/01 14:59:01 INFO mapred.JobClient: File Input Format Counters 
13/05/01 14:59:01 INFO mapred.JobClient:  Bytes Read=99 
13/05/01 14:59:01 INFO mapred.JobClient: FileSystemCounters 
13/05/01 14:59:01 INFO mapred.JobClient:  HDFS_BYTES_READ=215 
13/05/01 14:59:01 INFO mapred.JobClient:  FILE_BYTES_WRITTEN=21557 
13/05/01 14:59:01 INFO mapred.JobClient: Map-Reduce Framework 
13/05/01 14:59:01 INFO mapred.JobClient:  Map output materialized bytes=6 
13/05/01 14:59:01 INFO mapred.JobClient:  Combine output records=0 
13/05/01 14:59:01 INFO mapred.JobClient:  Map input records=0 
13/05/01 14:59:01 INFO mapred.JobClient:  Spilled Records=0 
13/05/01 14:59:01 INFO mapred.JobClient:  Map output bytes=0 
13/05/01 14:59:01 INFO mapred.JobClient:  Total committed heap usage (bytes)=160763904 
13/05/01 14:59:01 INFO mapred.JobClient:  Map input bytes=0 
13/05/01 14:59:01 INFO mapred.JobClient:  Combine input records=0 
13/05/01 14:59:01 INFO mapred.JobClient:  Map output records=0 
13/05/01 14:59:01 INFO mapred.JobClient:  SPLIT_RAW_BYTES=116 
13/05/01 14:59:01 INFO mapred.JobClient: Job Failed: # of failed Map Tasks exceeded allowed limit. FailedCount: 1. LastFailedTask: task_201304302334_0002_m_000000 
Exception in thread "main" java.io.IOException: Job failed! 
    at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1265) 
    at org.myorg.ParallelTraining.main(ParallelTraining.java:353) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:601) 
    at org.apache.hadoop.util.RunJar.main(RunJar.java:156) 

错误包含在一行中

word.set(ConcatPaths[i]); 

In w帽子错误的原因?它产生了什么(由于它产生了什么)?帮助删除它。

+0

请不要使用DataInputStream来读取文本,也请把它从你的例子中删除,这个坏主意经常被复制。 http://vanillajava.blogspot.co.uk/2012/08/java-memes-which-refuse-to-die.html – 2013-06-24 07:07:27

回答

1

我怀疑ConcatPaths[i]为空,word.set需要非空值。

注意这是一个非常可能的情况,数组在任何给定的索引处都包含空值。要解决,您可以在设置之前进行无效检查。

if (ConcatPaths[i]) word.set(ConcatPaths[i]);

或者,您也可以确保一个值不为空,你把之前在ConcatPaths,或两者,可能记录一个错误,如果阵列中的任何值为null的,你不要指望那。

相关问题