2015-06-05 72 views
2

我有一个RDD,其中有很多列(例如数百个),我的大部分操作都在列上。我需要从不同列创建许多中间变量。Spark Spark RDD上的列操作

什么是最有效的方式做到这一点?

例如,如果我的dataRDD[Array[String]]是象下面这样:

123, 523, 534, ..., 893 
536, 98, 1623, ..., 98472 
537, 89, 83640, ..., 9265 
7297, 98364, 9, ..., 735 
...... 
29, 94, 956, ..., 758 

我将需要创建一个新的列或newCol1 = 2ndCol+19thCol变量,并根据newCol1另一个新柱和现有列:newCol2 = function(newCol1, 34thCol)

这样做的最好方法是什么?

我一直在使用索引的中间变量和dataRDD思考,然后加入他们一起对指数做我的计算:

var dataRDD = sc.textFile("/test.csv").map(_.split(",")) 
val dt = dataRDD.zipWithIndex.map(_.swap) 
val newCol1 = dataRDD.map(x => x(1)+x(18)).zipWithIndex.map(_.swap) 
val newCol2 = newCol1.join(dt).map(x=> function(.........)) 

是否有这样做的更好的办法?

回答

1

为什么不直接做这一切在一个:

var dataRDD = sc.textFile("/test.csv").map(_.split(",")) 
dataRDD.map(x=>{ 
    val newCol = x(1) + x(18) 
    val newCol2 = function(newCol, x(33)) 
    //anything else you need to do 
    newCol +: newCol2 +: x //This will return the original array with the new columns prepended 
    //x +: newCol +: newCol2 //Alternatively, this will return the original array with the new columns appended 
}) 
+0

感谢贾斯汀您的回复。我可能误解了你的观点。但是,当我尝试类似“dataRDD.map(x => {val a = x(1)})。collect”时,我得到了如Array [Unit] = Array((),(),(), ),(),(),(),(),(),())。我在这里错过了什么吗? – Carter

+0

对不起,我猜你对Scala并不太熟悉。函数中的最后一个语句是返回值。在列出的情况下,变量赋值的结果是一个Unit或()。我修改了我的代码,使这个更明显的如何完全做出改变 –

+0

谢谢贾斯汀!是的,我是Scala的新手,所以不熟悉一些概念。抱歉。 – Carter