2017-06-16 37 views
0

我是Scala的新手,目前我所做的是从大数据集中过滤数据并将它们打印为csv。因此,CSV我打印的格式如下:如何在Scala中添加每个更改的最后记录

id   time        status 
___  _____       _________ 
1  2016-10-09 00:09:10     100 
1  2016-10-09 00:09:30     100 
1  2016-10-09 00:09:50     100 
1  2016-10-09 00:10:10     900 
2  2016-10-09 00:09:18     100 
2  2016-10-09 00:09:20     100 
2  2016-10-09 00:10:24     900 
3  2016-10-09 00:09:30     100 
3  2016-10-09 00:09:33     100 
3  2016-10-09 00:09:36     100 
3  2016-10-09 00:09:39     100 
3  2016-10-09 00:09:51     900 

我使用下面的代码打印数据:

 var count=0; 

     val StatusList = ListBuffer[String](); 
     for (currentRow <- sortedRow) { 
       if (currentRow.status==100){ 
        StatusList.+=(currentRow.id+","+currentRow.time+","+currentRow.status) 
       } 
       if((count+1) < sortedRow.size && sortedRow(count+1).status==900) { 
        StatusList.+=(sortedRow(count+1).id+","+sortedRow(count+1).time+","+sortedRow(count+1).status) 
       } 
    count+=1; 

    } 

这个我想用状态100打印行,而不是和追加记录当他们改变时。基本上我想要打印的数据如下:

id  time    status id  change_time   status 
___  _____    _________ __ ______________  _______ 
1 2016-10-09 00:09:10  100  1  2016-10-09 00:10:10 900 
1 2016-10-09 00:09:30  100  1  2016-10-09 00:10:10 900 
1 2016-10-09 00:09:50  100  1  2016-10-09 00:10:10 900 
2 2016-10-09 00:09:18  100  2  2016-10-09 00:10:24 900 
2 2016-10-09 00:09:20  100  2  2016-10-09 00:10:24 900 
3 2016-10-09 00:09:30  100  3  2016-10-09 00:09:51 900 
3 2016-10-09 00:09:33  100  3  2016-10-09 00:09:51 900 
3 2016-10-09 00:09:36  100  3  2016-10-09 00:09:51 900 
3 2016-10-09 00:09:39  100  3  2016-10-09 00:09:51 900 
+0

你可以将两个状态分成两个csvs,但是追加规则是什么?随机追加还是有一些严格的追加规则? –

+0

我可以分开,但为了进一步分析,我需要保持上述格式 – Ricky

+0

您没有仔细阅读我的问题。我问这个组合的规则是什么? –

回答

3

我使用dataframes这是RDD S DONE的优化和改进工作建议你的解决方案。

我假设的数据是在与标题行格式

id,time,status 
1,2016-10-0900:09:10,100 
1,2016-10-0900:09:30,100 
1,2016-10-0900:09:50,100 
1,2016-10-0900:10:10,900 

第一步是使用sqlContext

val sqlContext = sparkSession.sqlContext 
val dataframe = sqlContext.read.format("csv").option("header", "true").load("absolute path to the input file") 
文件读入 dataframe

你应该有dataframe

+---+------------------+------+ 
|id |time    |status| 
+---+------------------+------+ 
|1 |2016-10-0900:09:10|100 | 
|1 |2016-10-0900:09:30|100 | 
|1 |2016-10-0900:09:50|100 | 
|1 |2016-10-0900:10:10|900 | 
|2 |2016-10-0900:09:18|100 | 
|2 |2016-10-0900:09:20|100 | 
|2 |2016-10-0900:10:24|900 | 
|3 |2016-10-0900:09:30|100 | 
|3 |2016-10-0900:09:33|100 | 
|3 |2016-10-0900:09:36|100 | 
|3 |2016-10-0900:09:39|100 | 
|3 |2016-10-0900:09:51|900 | 
+---+------------------+------+ 

下一步将b e将dataframestatus差分滤波器成两个

val df1 = dataframe.filter(dataframe("status") === "100") 

输出作为

+---+------------------+------+ 
|id |time    |status| 
+---+------------------+------+ 
|1 |2016-10-0900:09:10|100 | 
|1 |2016-10-0900:09:30|100 | 
|1 |2016-10-0900:09:50|100 | 
|2 |2016-10-0900:09:18|100 | 
|2 |2016-10-0900:09:20|100 | 
|3 |2016-10-0900:09:30|100 | 
|3 |2016-10-0900:09:33|100 | 
|3 |2016-10-0900:09:36|100 | 
|3 |2016-10-0900:09:39|100 | 
+---+------------------+------+ 

遵循900状态同样为df2column名称更名

val df2 = dataframe.filter(dataframe("status") === "900") 
    .withColumnRenamed("id", "id2") 
    .withColumnRenamed("time", "changed_time") 
    .withColumnRenamed("status", "status2") 

输出应该是

+---+------------------+-------+ 
|id2|changed_time  |status2| 
+---+------------------+-------+ 
|1 |2016-10-0900:10:10|900 | 
|2 |2016-10-0900:10:24|900 | 
|3 |2016-10-0900:09:51|900 | 
+---+------------------+-------+ 

最后一步是join这两个dataframes

val finalDF = df1.join(df2, df1("id") === df2("id2"), "left") 

最终输出是

+---+------------------+------+---+------------------+-------+ 
|id |time    |status|id2|changed_time  |status2| 
+---+------------------+------+---+------------------+-------+ 
|1 |2016-10-0900:09:10|100 |1 |2016-10-0900:10:10|900 | 
|1 |2016-10-0900:09:30|100 |1 |2016-10-0900:10:10|900 | 
|1 |2016-10-0900:09:50|100 |1 |2016-10-0900:10:10|900 | 
|2 |2016-10-0900:09:18|100 |2 |2016-10-0900:10:24|900 | 
|2 |2016-10-0900:09:20|100 |2 |2016-10-0900:10:24|900 | 
|3 |2016-10-0900:09:30|100 |3 |2016-10-0900:09:51|900 | 
|3 |2016-10-0900:09:33|100 |3 |2016-10-0900:09:51|900 | 
|3 |2016-10-0900:09:36|100 |3 |2016-10-0900:09:51|900 | 
|3 |2016-10-0900:09:39|100 |3 |2016-10-0900:09:51|900 | 
+---+------------------+------+---+------------------+-------+ 

保存最后dataframecsv文件是很容易的,以及

finalDF.write.format("csv").save("absolute path to output filename ") 
相关问题