我正在编写一个独立的Spark程序,从Cassandra获取其数据。 我遵循这些示例,并通过newAPIHadoopRDD()和ColumnFamilyInputFormat类创建了RDD。 的RDD被创建,但我得到一个NotSerializableException当我打电话的RDD的.groupByKey()方法:与Cassandra行为的Apache Spark
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setMaster("local").setAppName("Test");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
Job job = new Job();
Configuration jobConf = job.getConfiguration();
job.setInputFormatClass(ColumnFamilyInputFormat.class);
ConfigHelper.setInputInitialAddress(jobConf, host);
ConfigHelper.setInputRpcPort(jobConf, port);
ConfigHelper.setOutputInitialAddress(jobConf, host);
ConfigHelper.setOutputRpcPort(jobConf, port);
ConfigHelper.setInputColumnFamily(jobConf, keySpace, columnFamily, true);
ConfigHelper.setInputPartitioner(jobConf,"Murmur3Partitioner");
ConfigHelper.setOutputPartitioner(jobConf,"Murmur3Partitioner");
SlicePredicate predicate = new SlicePredicate();
SliceRange sliceRange = new SliceRange();
sliceRange.setFinish(new byte[0]);
sliceRange.setStart(new byte[0]);
predicate.setSlice_range(sliceRange);
ConfigHelper.setInputSlicePredicate(jobConf, predicate);
JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> rdd =
spark.newAPIHadoopRDD(jobConf,
ColumnFamilyInputFormat.class.asSubclass(org.apache.hadoop.mapreduce.InputFormat.class),
ByteBuffer.class, SortedMap.class);
JavaPairRDD<ByteBuffer, Iterable<SortedMap<ByteBuffer, IColumn>>> groupRdd = rdd.groupByKey();
System.out.println(groupRdd.count());
}
例外:
java.io.NotSerializableException:java.nio.HeapByteBuffer 在java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1164) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483) at java .io.ObjectOutputStream.writeOrdinaryO bject(ObjectOutputStream.java:1400) 在java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) 在java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330) 在org.apache.spark.serializer。 JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:179) at org.apache.spark.scheduler.ShuffleMapTask $$ anonfun $ runTask $ 1.apply( ShuffleMapTask.scala:161) at org.apache.spark.scheduler.ShuffleMapTask $$ anonfun $ runTask $ 1.apply(ShuffleMapTask.scala:158) at scala.collection.Iterator $ class.foreach(Iterator.scala:727) at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) at org.apache.spar kscheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.Task.run(Task。在java中使用java.util.concurrent.ThreadPoolExecutor $ Worker.runTask(ThreadPoolExecutor.java:895) 可以在org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:187) util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:918) 在java.lang.Thread.run(Thread.java:662)
我所试图做的是合并所有行键列转换为单个条目。 我也得到了同样的异常,当我尝试使用reduceByKey()方法,像这样:
JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> reducedRdd = rdd.reduceByKey(
new Function2<SortedMap<ByteBuffer, IColumn>, SortedMap<ByteBuffer, IColumn>, sortedMap<ByteBuffer, IColumn>>() {
public SortedMap<ByteBuffer, IColumn> call(SortedMap<ByteBuffer, IColumn> arg0,
SortedMap<ByteBuffer, IColumn> arg1) throws Exception {
SortedMap<ByteBuffer, IColumn> sortedMap = new TreeMap<ByteBuffer, IColumn>(arg0.comparator());
sortedMap.putAll(arg0);
sortedMap.putAll(arg1);
return sortedMap;
}
}
);
我使用:
- 火花1.0.0彬hadoop1
- 卡桑德拉1.2.12
- 的Java 1.6
有谁知道问题是什么? 那里是什么,序列化失败?
感谢,
夏嘉曦
亚切克嗨官方DataStax卡桑德拉司机,谢谢您的答复。我的解决方案就是这样。 – user3770713