2016-09-16 130 views
5

我想从简单的CSV文件创建Spark数据集。下面是CSV文件的内容:从CSV文件创建Spark数据集

name,state,number_of_people,coolness_index 
trenton,nj,"10","4.5" 
bedford,ny,"20","3.3" 
patterson,nj,"30","2.2" 
camden,nj,"40","8.8" 

这里是使数据集的代码:

var location = "s3a://path_to_csv" 

case class City(name: String, state: String, number_of_people: Long) 

val cities = spark.read 
    .option("header", "true") 
    .option("charset", "UTF8") 
    .option("delimiter",",") 
    .csv(location) 
    .as[City] 

以下是错误消息:“不能达到投number_of_people从字符串到BIGINT,因为它可能截断“

Databricks谈论如何在this blog post中创建数据集和此特定错误消息。

编码器急切地检查你的数据预期的架构, 匹配提供错误信息帮助你试图将数据的错误 过程的TB之前。例如,如果我们尝试使用 太小的数据类型,例如转换为对象会导致 截断(即numStudents大于一个字节,其中最大值为255),分析器将发出一个数据类型为 AnalysisException。

我使用的是Long类型,所以我没想到会看到这个错误信息。

回答

11

使用架构推断:

val cities = spark.read 
    .option("inferSchema", "true") 
    ... 

或提供模式:

val cities = spark.read 
    .schema(StructType(Array(StructField("name", StringType), ...) 

或投:

val cities = spark.read 
    .option("header", "true") 
    .csv(location) 
    .withColumn("number_of_people", col("number_of_people").cast(LongType)) 
    .as[City] 
0

与案例类城市(名称:字符串,状态:字符串, number_of_people:长), 你只需要一条线

private val cityEncoder = Seq(City("", "", 0)).toDS 

,那么你的代码

val cities = spark.read 
.option("header", "true") 
.option("charset", "UTF8") 
.option("delimiter",",") 
.csv(location) 
.as[City] 

将只是工作。

这是,得自这个网站的官方消息[http://spark.apache.org/docs/latest/sql-programming-guide.html#overview][1]