2014-09-30 73 views
2

如何迭代JavaPairRDD。我已经完成了一个小组,并返回了一个RDD,如下所示:JavaPairRDD(Tuple 7字符串和对象列表)Spark JavaPairRDD迭代

现在我必须遍历此RDD并在Pig中执行一些计算,例如FOR EACH。 基本上我想迭代键和值的列表,并做一些操作,然后返回一个JavaPairRDD?

JavaPairRDD<Tuple7<String, String,String,String,String,String,String>, List<Records>> sizes =  
piTagRecordData.groupBy(new Function<Records, Tuple7<String, String,String,String,String,String,String>>() { 
     private static final long serialVersionUID = 2885738359644652208L; 
     @Override 
     public Tuple7<String, String,String,String,String,String,String> call(Records row) throws Exception { 
      Tuple7<String, String,String,String,String,String,String> compositeKey = new Tuple7<String, String, String, String, String, String, String>(row.getAsset_attribute_id(),row.getDate_time_value(),row.getOperation(),row.getPi_tag_count(),row.getAsset_id(),row.getAttr_name(),row.getCalculation_type()); 
      return compositeKey; 
     } 
    }); 

此我要为大小的每个成员(JavaPairRDD)执行后,操作 - 像

rejected_records = FOREACH sizes GENERATE FLATTEN(Java function on the List of Records based on the group key 

我使用星火0.9.0

+0

到目前为止,你能展示一些你的工作吗? – Anas 2014-09-30 12:43:59

+0

@Anas - 更新我的评论 – 2014-09-30 15:40:33

回答

1

即使您正在讨论“FOR EACH”,它实际上听起来像是您想要的flatMap操作,因为您想要生成新值并将其变平。这适用于Java RDD,包括JavaPairRDD

+0

你能告诉我们如何迭代JavaPairRDD someRDDName。我想获取这个rdd的值,对字符串进行一些操作,然后将其保存到cassandra。 – 2016-02-11 14:10:11

1
+0

以上但是对于每一个都不会返回任何东西。如果我想在手术后坚持下去,怎么办? – 2014-09-30 15:25:27

+1

此方法是[action](http://spark.apache.org/docs/latest/programming-guide.html#actions)。它准备与外部存储系统进行交互。如果您想先执行任何操作,则可以在保存数据之前执行操作,或者先执行[转换](http://spark.apache.org/docs/latest/programming-guide.html#transformations)。例如,您可以使用map或mapPartitions。 – ajnavarro 2014-10-01 07:05:28

1

,如果你想查看JavaPairRDD的一些价值,我会做这样的

for (Tuple2<String, String> test : pairRdd.take(10)) //or pairRdd.collect() 
      { 
       System.out.println(test._1); 
       System.out.println(test._2); 
      } 

注:Tuple2(假设你有JavaPairRDD内字符串),根据存储在数据类型更改数据类型JavaPairRDD。

相关问题