2014-07-24 44 views
0

所以我对函数式编程以及Spark和Scala相当新颖,所以如果这很明显,请原谅我......但基本上我有一个HDFS文件列表,一定的标准,即是这样的:在Spark中构建一个在Scala中递归联合的RDD

val List = (
"hdfs:///hive/some.db/BigAssHiveTable/partyear=2014/partmonth=06/partday=01/000140_0", 
"hdfs:///hive/some.db/BigAssHiveTable/partyear=2014/partmonth=06/partday=03/000258_0", 
"hdfs:///hive/some.db/BigAssHiveTable/partyear=2014/partmonth=06/partday=05/000270_0", 
"hdfs:///hive/some.db/BigAssHiveTable/partyear=2014/partmonth=06/partday=01/000297_0", 
"hdfs:///hive/some.db/BigAssHiveTable/partyear=2014/partmonth=06/partday=30/000300_0", 
"hdfs:///hive/some.db/BigAssHiveTable/partyear=2014/partmonth=06/partday=01/000362_0", 
"hdfs:///hive/some.db/BigAssHiveTable/partyear=2014/partmonth=06/partday=29/000365_0", 
"hdfs:///hive/some.db/BigAssHiveTable/partyear=2014/partmonth=06/partday=01/000397_0", 
"hdfs:///hive/some.db/BigAssHiveTable/partyear=2014/partmonth=06/partday=15/000436_0", 
"hdfs:///hive/some.db/BigAssHiveTable/partyear=2014/partmonth=06/partday=16/000447_0", 
"hdfs:///hive/some.db/BigAssHiveTable/partyear=2014/partmonth=06/partday=01/000529_0", 
"hdfs:///hive/some.db/BigAssHiveTable/partyear=2014/partmonth=06/partday=17/000585_0") 

我现在需要建立一个RDD一起工作从这个名单...我的想法是使用递归联盟......基本上函数是这样的:

def dostuff(line: String): (org.apache.spark.rdd.RDD[String]) = { 
     val x = sc.textFile(line) 
     val x:org.apache.spark.rdd.RDD[String] = sc.textFile(x) ++ sc.textFile(line) 
} 

然后只需在地图上应用它:

val RDD_list = List.map(l => l.dostuff) 

回答

3

可以读取所有的文件到一个单一的RDD这样的:

val sc = new SparkContext(...) 
sc.textFile("hdfs:///hive/some.db/BigAssHiveTable/partyear=2014/partmonth=06/*/*") 
    .map(line => ...) 
+0

辉煌!谢谢!应该想到的是......后续问题虽然......所以我现在有这样的事情: –