2017-08-06 110 views
0

键排序我JavaPairRDD类型的关键Tuple2<Integer, Integer>在apache的火花JavaPairRDD

我希望通过我的钥匙JavaPairRDD排序所以我写了这样一个比较:

JavaPairRDD<Tuple2<Integer, Integer>, Integer> Rresult=result.sortByKey(new Comparator<Tuple2<Integer, Integer>>() { 
    @Override 
    public int compare(Tuple2<Integer, Integer> o1, Tuple2<Integer, Integer> o2) { 
     if(o1._1()==o2._1()) 
      return o1._2()-o2._2(); 
     return o1._1()-o2._1(); 
     } 
},true); 

这个排序值通过第一次进入元组,如果它们与第二项相同。

但我收到以下错误堆栈:

java.lang.reflect.InvocationTargetException 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 

.. scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1083) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) 
    at 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
Caused by: java.lang.ArrayIndexOutOfBoundsException: 0 
    at java.io.ObjectStrea 

回答

1

你如何创建JavaPairRDD?在应用分类之前请检查它。 Yow也将得到Task不可序列化的异常,直接在sortByKey方法中使用新的比较器。您应该在单独的课程中实施ComparatorSerializable并将其传递给sortByKey方法。以下是供您参考的样本。

public class SparkSortSample { 
public static void main(String[] args) { 
    //SparkSession 
    SparkSession spark = SparkSession 
      .builder() 
      .appName("SparkSortSample") 
      .master("local[1]") 
      .getOrCreate(); 
    JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext()); 
    //Sample data 
    List<Tuple2<Tuple2<Integer, Integer>, Integer>> inputList = new ArrayList<Tuple2<Tuple2<Integer, Integer>, Integer>>(); 
    inputList.add(new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(2, 444), 4444)); 
    inputList.add(new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(3, 333), 3333)); 
    inputList.add(new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(1, 111), 1111)); 
    inputList.add(new Tuple2<Tuple2<Integer, Integer>, Integer>(new Tuple2<Integer, Integer>(2, 222), 2222)); 
    //JavaPairRDD 
    JavaPairRDD<Tuple2<Integer, Integer>, Integer> javaPairRdd = jsc.parallelizePairs(inputList); 
    //Sorted RDD 
    JavaPairRDD<Tuple2<Integer, Integer>, Integer> sortedPairRDD = javaPairRdd.sortByKey(new TupleComparator(), true); 
    sortedPairRDD.foreach(rdd -> { 
     System.out.println("sort = " + rdd); 
    }); 
    // stop 
    jsc.stop(); 
    jsc.close(); 
    } 
} 

这里是TupleComparator类,它实现了Comparator和Serializable接口。

class TupleComparator implements Comparator<Tuple2<Integer, Integer>>, Serializable { 
@Override 
public int compare(Tuple2<Integer, Integer> o1, Tuple2<Integer, Integer> o2) { 
    if (o1._1() == o2._1()) 
     return o1._2() - o2._2(); 
    return o1._1() - o2._1(); 
    } 
}