我已经输入CSV文件,如下阅读CSV与字符串文件到RDD火花
123,john,ny,"new-emp-in"
111,rock,sf,"recently,promoted"
100,smith,la,"10.101.0129-name,string"
,所以我怎么能阅读和创建RDD?
com.databricks:spark-csv_2.11:1.5.0支持这个,但我不能使用它。
我已经输入CSV文件,如下阅读CSV与字符串文件到RDD火花
123,john,ny,"new-emp-in"
111,rock,sf,"recently,promoted"
100,smith,la,"10.101.0129-name,string"
,所以我怎么能阅读和创建RDD?
com.databricks:spark-csv_2.11:1.5.0支持这个,但我不能使用它。
在spark 2.0+中,您可以使用SparkSession.read
方法读取多种格式,其中之一是csv。使用这种方法,你可以做到以下几点:
df = spark.read.csv(filename)
或为RDD刚:
rdd = spark.read.csv(filename).rdd
更新
在看到您的评论看一看CSV Data Source for Apache Spark 1.x。我相信这是导致Spark 2.0中包含读取方法的一部分。
使用旧版本的Spark,您无法直接读取csv。但是你可以包含Databrick的csv jar并使用他们的Python API来实现。
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
inferschema='true').load('cars.csv')
df.select('year',
'model').write.format('com.databricks.spark.csv').save('newcars.csv')
这个有用吗?您有任何后续问题吗? – alpeshpandya
什么火花版本您使用的? 'val df = spark.read.option(“header”,false).option(“inferSchema”,true).csv(“*。csv”)'适用于我的spark 2.0.2。 – Psidom
是的,但我使用1.x :( – user491
除了认真考虑升级到spark2.x,如果你真的不能使用databrick的csv包(出于神秘的原因!),最简单的方法是使用'textFile'方法来读取文件,并使用正则表达式来查找引号之间包含逗号的列/数据,并将其替换为其他内容。因此,这是基本的数据准备操作,一旦完成了这些操作,就可以使用简单的'rdd.map(lambda行:row.split(“,”))'操作,使每行作为逗号分隔列表 – Pushkr