2017-02-18 20 views
0

我正在尝试将hadoop代码迁移到spark中。我已经有了一些预定义的函数,我应该可以在spark中重用,因为它们仅仅是java代码,没有太多的hadoop依赖性。我有一个函数接受文本格式的输入(空间数据 - 经度,纬度)并将它们转换为形状(多边形,线流等)。当我尝试在Spark中读取它时,我首先以String的形式读取每行文件。然后将它们转换为文本,以便我可以使用我以前创建的函数。但是我有两个疑问,首先看起来JavaRDD没有使用文本,并且我正在收到一些问题。其次,将文本转换为形状的功能不会返回任何内容。但我无法使用flatMap或任何其他映射技术。我甚至不确定我的方法是否正确。在JavaRDD中使用文本数据类型并在FlatMap中返回void

这里是我的代码模型:

/*function for converting Text to Shape*/ 
public interface TextSerializable { 
public Text toText(Text text); 
public void fromText(Text text); 
* Retrieve information from the given text. 
* @param text The text to parse 
*/ 
} 



/*Shape Class looks something like this*/ 

public interface Shape extends Writable, Cloneable, TextSerializable { 
/
* Returns minimum bounding rectangle for this shape. 
* @return The minimum bounding rectangle for this shape 
*/ 
public Rectangle getMBR(); 

/** 
* Gets the distance of this shape to the given point. 
* @param x The x-coordinate of the point to compute the distance to 
* @param y The y-coordinate of the point to compute the distance to 
* @return The Euclidean distance between this object and the given point 
*/ 
...... 
...... 
......*/ 

/*My code structure*/ 

SparkConf conf = new SparkConf().setAppName("XYZ").setMaster("local"); 
JavaSparkContext sc =new JavaSparkContext(conf); 

final Text text=new Text(); 

JavaRDD<String> lines = sc.textFile("ABC.csv"); 

lines.foreach(new VoidFunction<String>(){ 
public void call(String lines){ 
     text.set(lines); 
     System.out.println(text); 
    } 
    }); 

/*Problem*/ 
text.flatMap(new FlatMapFunction<Text>(){ 
    public Iterable<Shape> call(Shape s){ 
     s.fromText(text); 
     //return void; 
    } 

代码的最后一行是错误的,但我不知道如何解决它。 JavaRDD可以与用户定义的类一起使用(根据我的知识)。我甚至不确定我是否已经将字符串行转换为文本文本(如果RDD中允许的话)。我在Spark中是全新的。任何形式的帮助都会很棒。

回答

0

你完全脱离了这个概念。首先,您不能在任何对象上调用像map,flatmap等函数,只能从JavaRDD调用它们,而Text不是JavaRDD和Spark do支持文本,而不是您使用它的方式。

现在来到你的问题,因为你想将字符串转换为文本格式,使用这样的事情

SparkConf conf = new SparkConf().setAppName("Name of Application"); 
    JavaSparkContext sc = new JavaSparkContext(conf); 
    JavaRDD<String> logData = sc.textFile("replace with address of file"); 

/*This map function will take string as input because we are calling it on javaRDD logData and that logData return string type value. This map fucntion will give Text as output 
you can replace the return statement with logic of your toText function(However new Text(s) is also a way to convert string into Text) but remember use of return is mandatory so apply logic accordingly 
     */ 
     JavaRDD<Text> rddone = logData.map(new Function<String,Text>(){ 
      public Text call(String s) 
      {// type logic of your toText() function here 
      return new Text(s);}}); 

现在,当我们调用flatmap功能在JavaRDD rddone将需要输入的文本,因为rddone的输出是文本,它可以给你任何你想要的输出。

/* This flatmap fucntion will take Text as input and will give iterator over object */ 
    JavaRDD <Object> empty = rddone.flatMap(new FlatMapFunction<Text,Object>(){ 
      public Iterator<Object> call(Text te) 
      { 
       // here you can call your fromText(te) method. 
       return null; 
     } 
     }); 

也请参阅有关更多详细信息http://spark.apache.org/docs/latest/programming-guide.html

http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/api/java/JavaRDD.html

+0

非常感谢这些链接。我有一个想法如何去做。 – SGh

相关问题