2017-02-01 85 views
0

我有一个用〜分隔的文本文件,我需要在转换为数据框之前执行一些分析。该代码读取文本文件为RDD [String]做一些解析。然后,它转换为RDD [行]。然后用模式创建一个数据框。Spark Scala数据框转换

所以下面是我有下面的代码。它的工作原理,但问题是实际的模式长达400个字段。我想知道是否有比输入属性(1),属性(2),属性(3)等更简单的方法。

我目前在Spark 1.6上。 CDH 5.2.2

示例输入:

20161481132310 ~  ~"This" is a comma 10 

当前代码:

val schema_1 = StructType(Array(
StructField("EXAMPLE_1", StringType, true), 
StructField("EXAMPLE_2", StringType, true), 
StructField("EXAMPLE_3", StringType, true))) 

val rdd = sc.textFile("example.txt") 
val rdd_truncate = rdd.map(_.split("~").map(_.trim).mkString("~")) 
val row_final = rdd_truncate 
    .map(_.split("~")) 
    .map(attributes => Row(attributes(0), 
    attributes(1), 
    attributes(2))) 

val df = sqlContext.createDataFrame(row_final, schema_1) 

基于建议予修改后用于以下。它的工作原理除引号外。输入中的“This”将失败。有什么建议么?

val df = sqlContext.read 
     .format("com.databricks.spark.csv") 
     .option("delimiter","~") 
     .schema(schema) 
     .load("example.txt") 
val df_final = df.select(df.columns.map(c =>trim(col(c)).alias(c)): _*) 

回答

3

只需使用标准的CSV读者:

spark.read.schema(schema).option("delimiter", "~").csv("example.txt") 

如果要修剪领域只使用select

import org.apache.spark.sql.functions.{col, trim} 

df.select(df.columns.map(c => trim(col(c)).alias(c)): _*) 

如果使用星火1.x中,你可以使用spark-csv

sqlContext.read 
    .format("com.databricks.spark.csv") 
    .schema(schema) 
    .option("delimiter", "~") 
    .load("example.txt") 

如果这是由于某种原因不足,您可以使用Row.fromSeq

Row.fromSeq(line.split("~").take(3)) 
相关问题