2015-05-25 62 views
6

我有一个自定义数据源,我想将数据加载到我的Spark群集中以执行一些计算。为此,我看到我可能需要为我的数据源实施新的RDD在Java中实现自定义Spark RDD

我是一个完整的Scala noob,我希望我可以在Java本身实现RDD。我环顾了互联网,找不到任何资源。任何指针?

我的数据在S3中,并在Dynamo中编入索引。例如,如果我想要在给定时间范围内加载数据,则首先需要查询Dynamo以获取相应时间范围的S3文件密钥,然后将它们加载到Spark中。这些文件可能并不总是具有相同的S3路径前缀,因此sc.testFile("s3://directory_path/")将不起作用。

我正在寻找有关如何实现与HadoopRDDJdbcRDD类似的东西的指针,但使用Java。类似于他们在这里完成的事情:DynamoDBRDD。这个从Dynamo读取数据,我的自定义RDD将查询DynamoDB的S3文件密钥,然后从S3加载它们。

+1

一个'RDD'是一个相当柔性容器。你为什么认为你需要重新实现它?你的数据的格式是什么? – ohruunuruus

+0

我的数据在S3中,并在Dynamo中编入索引。例如,如果我想要在给定时间范围内加载数据,则首先需要查询Dynamo以获取相应时间范围的S3文件密钥,然后将它们加载到Spark中。这些文件可能并不总是处于相同的S3路径前缀中,因此'''sc.testFile(“s3:// directory_path /”)'''将不起作用。我正在寻找关于如何实现类似于HadoopRDD或JdbcRDD的指针,但使用Java。 –

+0

根据此:http://apache-spark-user-list.1001560.n3.nabble.com/is-there-any-easier-way-to-define-a-custom-RDD-in-Java-td6917 .html一年前是不可能的。不过,我很想知道是否有任何改变。 – tsiki

回答

1

一个选项是阅读Hadoop规范,但是如果您的数据是结构化的Spark SQL有一个新的Data Sources API,某些实现发布在Spark Packages上,包括avro,redshift和csv。

8

您可以在Java中扩展RDD并实现getPartitions和计算方法。

Java可以扩展Scala类,但有一些限制。

实施例:

package com.openmarket.danyal; 
// Other imports left out 
import org.apache.spark.Dependency; 
import org.apache.spark.Partition; 
import org.apache.spark.SparkConf; 
import org.apache.spark.SparkContext; 
import org.apache.spark.TaskContext; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.rdd.RDD; 

import scala.collection.AbstractIterator; 
import scala.collection.Iterator; 
import scala.collection.mutable.ArrayBuffer; 
import scala.reflect.ClassManifestFactory$; 
import scala.reflect.ClassTag; 

public class AlphaTest { 
    private static final ClassTag<String> STRING_TAG = ClassManifestFactory$.MODULE$.fromClass(String.class); 

    public static void main(final String[] args) { 
     SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("Learn ABCs"); 
     try(JavaSparkContext sc = new JavaSparkContext(conf)) { 
      System.out.println(new AlphabetRDD(sc.sc()).toJavaRDD().collect()); 
     } 
    } 

    public static class AlphabetRDD extends RDD<String> { 
     private static final long serialVersionUID = 1L; 

     public AlphabetRDD(SparkContext sc) { 
      super(sc, new ArrayBuffer<Dependency<?>>(), STRING_TAG); 
     } 

     @Override 
     public Iterator<String> compute(Partition arg0, TaskContext arg1) { 
      AlphabetRangePartition p = (AlphabetRangePartition)arg0; 
      return new CharacterIterator(p.from, p.to); 
     } 

     @Override 
     public Partition[] getPartitions() { 
      return new Partition[] {new AlphabetRangePartition(1, 'A', 'M'), new AlphabetRangePartition(2, 'P', 'Z')}; 
     } 

    } 

    /** 
    * A partition representing letters of the Alphabet between a range 
    */ 
    public static class AlphabetRangePartition implements Partition { 
     private static final long serialVersionUID = 1L; 
     private int index; 
     private char from; 
     private char to; 

     public AlphabetRangePartition(int index, char c, char d) { 
      this.index = index; 
      this.from = c; 
      this.to = d; 
     } 

     @Override 
     public int index() { 
      return index; 
     } 

     @Override 
     public boolean equals(Object obj) { 
      if(!(obj instanceof AlphabetRangePartition)) { 
       return false; 
      } 
      return ((AlphabetRangePartition)obj).index != index; 
     } 

     @Override 
     public int hashCode() { 
      return index(); 
     } 
    } 

    /** 
    * Iterators over all characters between two characters 
    */ 
    public static class CharacterIterator extends AbstractIterator<String> { 
     private char next; 
     private char last; 

     public CharacterIterator(char from, char to) { 
      next = from; 
      this.last = to; 
     } 

     @Override 
     public boolean hasNext() { 
      return next <= last; 
     } 

     @Override 
     public String next() { 
      // Post increments next after returning it 
      return Character.toString(next++); 
     } 
    } 
} 
+0

伟大的解决方案。一个问题:从0开始分区索引,否则这个自定义RDD上的'cartesian()'方法将导致ArrayOutOfBoundException异常。 '返回新分区[] {new AlphabetRangePartition(0,'A','M'),new AlphabetRangePartition(1,'P','Z')};' –