2014-03-19 50 views
1

我想使用spark shell从HDFS加入两个文件。 这两个文件是制表符分隔,我想加入的第二列在Spark中加入两个HDFS文件

试过代码 但不给任何输出

val ny_daily= sc.parallelize(List("hdfs://localhost:8020/user/user/NYstock /NYSE_daily")) 

val ny_daily_split = ny_daily.map(line =>line.split('\t')) 

val enKeyValuePair = ny_daily_split.map(line => (line(0).substring(0, 5), line(3).toInt)) 


val ny_dividend= sc.parallelize(List("hdfs://localhost:8020/user/user/NYstock/NYSE_dividends")) 

val ny_dividend_split = ny_dividend.map(line =>line.split('\t')) 

val enKeyValuePair1 = ny_dividend_split.map(line => (line(0).substring(0, 4),  line(3).toInt)) 

enKeyValuePair1.join(enKeyValuePair) 

但我没有得到有关如何加入对特定列 文件的任何信息请建议

回答

4

我没有得到有关如何加入对特定列文件的任何信息

RDDS都加入了他们的钥匙,让你决定列加入对当你说:

val enKeyValuePair = ny_daily_split.map(line => (line(0).substring(0, 5), line(3).toInt)) 
... 
val enKeyValuePair1 = ny_daily_split.map(line => (line(0).substring(0, 4), line(3).toInt)) 

你RDDS会从line(0).substring(0, 5)line(0).substring(0, 4)里的值进行连接。

您可以找到join函数(以及许多其他有用的函数)hereSpark Programming Guide是了解Spark如何工作的很好的参考。

试过代码,但不给任何输出

为了看到输出,你要问火花打印:

enKeyValuePair1.join(enKeyValuePair).foreach(println) 

注:从加载数据您应该使用的文件sc.textFile()sc.parallelize()仅用于使RDD远离Scala集合。

下面的代码应该做的工作:

val ny_daily_split = sc.textFile("hdfs://localhost:8020/user/user/NYstock/NYSE_daily").map(line =>line.split('\t')) 
val ny_dividend_split = sc.textFile("hdfs://localhost:8020/user/user/NYstock/NYSE_dividends").map(line =>line.split('\t')) 

val enKeyValuePair = ny_daily_split.map(line => line(0).substring(0, 5) -> line(3).toInt) 
val enKeyValuePair1 = ny_dividend_split.map(line => line(0).substring(0, 4) -> line(3).toInt) 

enKeyValuePair1.join(enKeyValuePair).foreach(println) 

顺便问一下,你提到你想加入第二列,但你实际使用的是line(0),这是故意的吗?

希望这会有所帮助!

+0

什么我应该把JOIN的关键和价值,因为我想加入列和作为输出我应该能够看到整个加入数据集 –

+0

然后改变你的'地图'功能''ny_daily_split.map(line =>线(1) - > line.mkString(“\ t”))''和'ny_dividend_split.map(line => line(1) - > line.mkString(“\ t”))''。 – fedragon

相关问题