2016-10-18 76 views
1

我有这样的csv文件上称输入:星火变换RDD

time,col1,col2,col3 
0,5,8,9 
1,6,65,3 
2,5,8,465,4 
3,85,45,8 

数列的未知 ,我希望在格式导致RDD:

(constant,column,time,value) 

这意味着: ((CAR 1 ,col1,0,5),(car1,col2,1,8)..)

我有RDD的时间,行和标题

class SimpleCSVHeader(header:Array[String]) extends Serializable { 
    val index = header.zipWithIndex.toMap 
    def apply(array:Array[String], key:String):String = array(index(key)) 
    } 
    val constant = "car1" 

    val csv = sc.textFile("C:\\file.csv") 

    val data = csv.map(line => line.split(",").map(elem => elem.trim)) 

    val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line 
    val rows = data.filter(line => header(line,"time") != "time") // filter the header out 
    val time = rows.map(row => header(row,"time")) 

,但我不知道如何从

回答

0

我suggetion创建结果RDD是使用数据框,而不是RDD为您的方案。但我厌倦了为您提供受数据量影响的工作解决方案。

 val lines = Array("time,col1,col2,col3", "0,5,8,9", "1,6,65,3", "2,5,8,465,4") 

     val sc = prepareConfig()  
     val baseRDD = sc.parallelize(lines)  
     val columList = baseRDD.take(1) 

//Prepare column list. this code can be avoided if you use DataFrames 
     val map = scala.collection.mutable.Map[Int, String]() 
     columList.foreach { x => 
      { 

     var index: Int = 0 
      x.split(",").foreach { x => 
       { 
       index += 1 
       map += (index -> x) 

       } 
      } 

      } 
     } 

     val mapRDD = baseRDD.flatMap { line => 
      { 
      val splits = line.split(",") 

//Replace Tuples with your case classes 
      Array(("car1", map(2), splits(0), splits(1)), ("car1", map(3), splits(0), splits(2)), ("car1", map(4), splits(0), splits(3))) 
      } 
     } 

     mapRDD.collect().foreach(f => println(f)) 

结果:

(CAR1,col1,0,5)(CAR1,col2,0,8)(CAR1,col3,0,9)(CAR1,col1,1,6 ) (CAR1,col2,1,65)(CAR1,col3,1,3)(CAR1,col1,2,5)(CAR1,col2,2,8) (CAR1,col3,2,465)