2014-01-28 47 views
0

这是我的代码:MapReduce的卡桑德拉给classcast异常

public class CopyOfCassandraMapRed extends Configured implements Tool { 
     private static final String KEYSPACE = "keyspace1"; 
     private static final String COLUMN_FAMILY = "users"; 
     private static final String CONF_COLUMN_NAME = "columnName"; 
     private static final String COLUMN_NAME = "name"; 
     private String OUTPUT_COLUMN_FAMILY = "OPt_CF"; 

     public static class CassandraMap extends 
       Mapper<String, SortedMap<byte[], IColumn>, Text, LongWritable> { 
      private final static LongWritable one = new LongWritable(1); 
      private Text word = new Text(); 
      private String columnName; 

      protected void map(String key, SortedMap<byte[], IColumn> columns, 
        Context context) throws java.io.IOException, 
        InterruptedException { 
       IColumn column = columns.get(columnName.getBytes()); 
       if (column == null) { 
        return; 
       } 

       word.set(new String(columnName)); 
       context.write(word, one); 
      } 

      protected void setup(Context context) throws java.io.IOException, 
        InterruptedException { 
       this.columnName = context.getConfiguration().get(CONF_COLUMN_NAME); 
      } 
     } 

     public static class CassandraReduce extends 
       Reducer<Text, LongWritable, Text, LongWritable> { 
      private LongWritable result = new LongWritable(); 

      protected void reduce(Text key, Iterable<LongWritable> values, 
        Context context) throws java.io.IOException, 
        InterruptedException { 
       long sum = 0; 
       for (LongWritable val : values) { 
        sum += val.get(); 
       } 

       result.set(sum); 
       context.write(key, result); 
      } 
     } 

     public int run(String[] args) throws Exception { 
      String outputPath = args[1]; 
      Configuration conf = getConf(); 

      conf.set(CONF_COLUMN_NAME, COLUMN_NAME); 
      Job job = new Job(conf, "CassandraMapRed"); 
      job.setJarByClass(CopyOfCassandraMapRed.class); 

      job.setMapperClass(CassandraMap.class); 
      job.setCombinerClass(CassandraReduce.class); 
      job.setReducerClass(CassandraReduce.class); 

      job.setOutputKeyClass(Text.class); 
      job.setOutputValueClass(LongWritable.class); 

      job.setInputFormatClass(ColumnFamilyInputFormat.class); 
      FileOutputFormat.setOutputPath(job, new Path(outputPath)); 

      ConfigHelper.setInputRpcPort(job.getConfiguration(), "9160"); 
      ConfigHelper 
        .setInputInitialAddress(job.getConfiguration(), "localhost"); 
      ConfigHelper.setInputPartitioner(job.getConfiguration(), 
        "org.apache.cassandra.dht.RandomPartitioner"); 

      ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, 
        COLUMN_FAMILY); 
      ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, 
        OUTPUT_COLUMN_FAMILY); 

      SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays 
        .asList(ByteBufferUtil.bytes(COLUMN_NAME) /* 
                  * ByteBufferUtil.bytes(
                  * "text") 
                  */)); 

      ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate); 

      System.out.println("running job now.."); 

      boolean success = job.waitForCompletion(true); 

      return success ? 0 : 1; 

     } 
    } 

在与主类编译它

package com.manual; 


    public class Hassandra { 

     private static final Log log = LogFactory.getLog(Hassandra.class); 

     private static final String CASSANDRA = "cassandra"; 

     public static void main(String[] args) { 
      Hassandra h = new Hassandra(); 
      // args[1] = "/home/user/Desktop/hadoopCass.op"; 
      System.exit(h.run(args)); 

     } 

     public int run(String[] args) { 
      StopWatch stopWatch = new StopWatch(); 
      stopWatch.start(); 

      int status = -1; 

      try { 
       // status = ToolRunner.run((Tool) new MapReducerDemo(), args); 
       status = ToolRunner.run(new CopyOfCassandraMapRed(), args); 
      } catch (Exception e) { 
       // TODO Auto-generated catch block 
       e.printStackTrace(); 
      } 
      stopWatch.stop(); 
      log.info("response time: " + stopWatch.getTime()); 
      return status; 
     } 

    } 

我发现了一个异常这里 -

java.lang.Exception: java.lang.ClassCastException: java.nio.HeapByteBuffer cannot be cast to java.lang.String 
     at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:354) 
    Caused by: java.lang.ClassCastException: java.nio.HeapByteBuffer cannot be cast to java.lang.String 
     at com.manual.CopyOfCassandraMapRed$CassandraMap.map(CopyOfCassandraMapRed.java:1) 
     at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) 
     at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764) 
     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:364) 
     at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223) 
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) 
     at java.util.concurrent.FutureTask.run(FutureTask.java:262) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:744) 
    14/01/28 14:54:28 INFO mapred.JobClient: map 99% reduce 0% 
    14/01/28 14:54:28 INFO mapred.JobClient: Job complete: job_local918272987_0001 
    14/01/28 14:54:28 INFO mapred.JobClient: Counters: 15 
    14/01/28 14:54:28 INFO mapred.JobClient: File Input Format Counters 
    14/01/28 14:54:28 INFO mapred.JobClient:  Bytes Read=0 
    14/01/28 14:54:28 INFO mapred.JobClient: FileSystemCounters 
    14/01/28 14:54:28 INFO mapred.JobClient:  FILE_BYTES_READ=142637420 
    14/01/28 14:54:28 INFO mapred.JobClient:  FILE_BYTES_WRITTEN=22830572 
    14/01/28 14:54:28 INFO mapred.JobClient: Map-Reduce Framework 
    14/01/28 14:54:28 INFO mapred.JobClient:  Map output materialized bytes=1536 
    14/01/28 14:54:28 INFO mapred.JobClient:  Combine output records=0 
    14/01/28 14:54:28 INFO mapred.JobClient:  Map input records=1 
    14/01/28 14:54:28 INFO mapred.JobClient:  Physical memory (bytes) snapshot=0 
    14/01/28 14:54:28 INFO mapred.JobClient:  Spilled Records=0 
    14/01/28 14:54:28 INFO mapred.JobClient:  Map output bytes=0 
    14/01/28 14:54:28 INFO mapred.JobClient:  Total committed heap usage (bytes)=190955126784 
    14/01/28 14:54:28 INFO mapred.JobClient:  CPU time spent (ms)=0 
    14/01/28 14:54:28 INFO mapred.JobClient:  Virtual memory (bytes) snapshot=0 
    14/01/28 14:54:28 INFO mapred.JobClient:  SPLIT_RAW_BYTES=26532 
    14/01/28 14:54:28 INFO mapred.JobClient:  Map output records=0 
    14/01/28 14:54:28 INFO mapred.JobClient:  Combine input records=0 
    14/01/28 14:54:28 INFO manual.Hassandra: response time: 26230 

谁能告诉我如何处理这个异常?

这些都是我在列系列条目

[[email protected]] list users; 
    Using default limit of 100 
    Using default cell limit of 100 
    ------------------- 
    RowKey: key1 
    => (name=name, value=myName, timestamp=1390899337732000) 

    1 Row Returned. 

回答

1

开关从

public static class CassandraMap extends 
      Mapper<String, SortedMap<byte[], IColumn>, Text, LongWritable> 

public static class CassandraMap extends 
      Mapper<ByteBuffer, SortedMap<byte[], IColumn>, Text, LongWritable> 

http://grepcode.com/file/repo1.maven.org/maven2/org.apache.cassandra/cassandra-all/2.0.1/org/apache/cassandra/hadoop/ColumnFamilyInputFormat.java回报的ByteBuffers,而不是字符串。

+0

是的,它是有帮助的。 –

+0

但仍然无法正常工作,所以我放弃并尝试另一个例子。 –

+0

这次有什么例外? – ursus