2017-04-05 45 views
0

我已经输入CSV文件,如下阅读CSV与字符串文件到RDD火花

123,john,ny,"new-emp-in" 
111,rock,sf,"recently,promoted" 
100,smith,la,"10.101.0129-name,string" 

,所以我怎么能阅读和创建RDD?
com.databricks:spark-csv_2.11:1.5.0支持这个,但我不能使用它。

+0

什么火花版本您使用的? 'val df = spark.read.option(“header”,false).option(“inferSchema”,true).csv(“*。csv”)'适用于我的spark 2.0.2。 – Psidom

+0

是的,但我使用1.x :( – user491

+2

除了认真考虑升级到spark2.x,如果你真的不能使用databrick的csv包(出于神秘的原因!),最简单的方法是使用'textFile'方法来读取文件,并使用正则表达式来查找引号之间包含逗号的列/数据,并将其替换为其他内容。因此,这是基本的数据准备操作,一旦完成了这些操作,就可以使用简单的'rdd.map(lambda行:row.split(“,”))'操作,使每行作为逗号分隔列表 – Pushkr

回答

1

在spark 2.0+中,您可以使用SparkSession.read方法读取多种格式,其中之一是csv。使用这种方法,你可以做到以下几点:

df = spark.read.csv(filename) 

或为RDD刚:

rdd = spark.read.csv(filename).rdd 

更新

在看到您的评论看一看CSV Data Source for Apache Spark 1.x。我相信这是导致Spark 2.0中包含读取方法的一部分。

0

使用旧版本的Spark,您无法直接读取csv。但是你可以包含Databrick的csv jar并使用他们的Python API来实现。

from pyspark.sql import SQLContext 
sqlContext = SQLContext(sc) 

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', 
    inferschema='true').load('cars.csv') 
df.select('year', 
    'model').write.format('com.databricks.spark.csv').save('newcars.csv') 

参考:https://github.com/databricks/spark-csv#python-api

+0

这个有用吗?您有任何后续问题吗? – alpeshpandya