2015-07-21 115 views
2

我对spark和scala很新颖,因此我有一些关于使用spark和使用rdds进行数据预处理的问题。 我正在开发一个小项目,我想用spark实现一个机器学习系统。使用这些算法是可以的,但我认为在预处理数据时遇到了问题。 我有一个包含30列和大约一百万行的数据集。但是,为了简单起见,让我们假设我有以下的数据集(CSV文件):使用apache spark和scala进行数据预处理

columnA, columnB, column_txt, label 
1  , a  , abc  , 0 
2  ,  , abc  , 0 
3  , b  , abc  , 1 
4  , b  , abc  , 1 
5  , a  , abc  , 0 
6  ,  , abc  , 0 
7  , c  , abc  , 1 
8  , a  , abc  , 1 
9  , b  , abc  , 1 
10  , c  , abc  , 0 

火花加载数据后,我想要做的步骤如下:

  1. 删除所有以结束所有列“ _txt”
  2. 筛选出所有行columnB是空的(这个我想通了的话)
  3. 删除有超过9级(从这里columnA)
那些列10

所以我有问题1和3. 我知道我不能删除列,所以我不得不创建一个新的RDD,但我怎么做没有某些列? 现在我正在加载csv文件,但没有火花头,但为我的任务,我需要。建议在单独的rdd中加载标题吗?但是,我该如何与该rdd进行交互以找到正确的列呢? 对不起,我知道很多问题,但我仍然在一开始就想学习。如果没有头装

import org.apache.spark.sql.DataFrame 

def moreThan9(df: DataFrame, col: String) = { 
    df.agg(countDistinct(col)).first()(0) match { 
     case x: Long => x > 9L 
     case _ => false 
    } 
} 

val newDf = df. 
    schema. // Extract schema 
    toArray. // Convert to array 
    map(_.name). // Map to names 
    foldLeft(df)((df: DataFrame, col: String) => { 
     if (col.endsWith("_txt") | moreThan9(df, col)) df.drop(col) else df 
    }) 

: 感谢和问候, 克里斯

+0

如果我是这样做的“手工”,我会创建一个RDD,调用'.take(1)'在主服务器上获取头文件,对头文件进行任何处理/并行处理,然后放下第一行以获取RDD中的数据。 – lmm

回答

1

假设数据帧装有头和结构是平的:

val df = sqlContext. 
    read. 
    format("com.databricks.spark.csv"). 
    option("header", "true"). 
    load("data.csv") 

这样的事情应该工作那么你可以使用映射从自动分配到实际做同样的事情。

+0

非常感谢您的回答!我会在接下来的几天内尝试。 – csnr

+0

为什么foldLeft而不是过滤? – dskrvk

+0

@dskrvk你的意思是类似'.filter(...)。foreach(col => df = df.drop(col))''?简单来说,我猜想的是参照透明度和个人品味。 – zero323

相关问题