2017-05-19 189 views
4

我有一个结构化的这样一个CSV文件:如何在使用PySpark作为数据框读取CSV文件时跳过行?

Header 
Blank Row 
"Col1","Col2" 
"1,200","1,456" 
"2,000","3,450" 

我在阅读本文件中的两个问题。

  1. 我想忽略页眉和忽略值内
  2. 的逗号的空白行不是分隔

这里是我的尝试:

df = sc.textFile("myFile.csv")\ 
       .map(lambda line: line.split(","))\ #Split By comma 
       .filter(lambda line: len(line) == 2).collect() #This helped me ignore the first two rows 

然而,这是行不通的,因为值内的逗号被当作分隔符来读取,并且len(line)返回4而不是2.

我尝试另一种方法:

data = sc.textFile("myFile.csv") 
headers = data.take(2) #First two rows to be skipped 

的想法是,然后使用滤波器,而不是读出的标头。但是,当我试图打印标题时,我得到了编码值。

[\x00A\x00Y\x00 \x00J\x00u\x00l\x00y\x00 \x002\x000\x001\x006\x00] 

什么是读取CSV文件并跳过前两行的正确方法?

回答

0

通过Zlidime回答有正确的想法。工作方案是这样的:

import csv 

customSchema = StructType([ \ 
    StructField("Col1", StringType(), True), \ 
    StructField("Col2", StringType(), True)]) 

df = sc.textFile("file.csv")\ 
     .mapPartitions(lambda partition: csv.reader([line.replace('\0','') for line in partition],delimiter=',', quotechar='"')).filter(lambda line: len(line) > 2 and line[0] != 'Col1')\ 
     .toDF(customSchema) 
4

尝试使用带有'quotechar'参数的csv.reader,它会正确拆分该行。 之后,您可以根据需要添加滤镜。

import csv 
from pyspark.sql.types import StringType 

df = sc.textFile("test2.csv")\ 
      .mapPartitions(lambda line: csv.reader(line,delimiter=',', quotechar='"')).filter(lambda line: len(line)>=2 and line[0]!= 'Col1')\ 
      .toDF(['Col1','Col2']) 
+0

不错@zlidime! – titipata

+0

csv。读取器抛出错误:行包含空字节 –

+0

我通过调用'csv.reader([l.replace('\ 0','')in line],delimiter =',',quotechar ='''')来修复它。 ' –

4

对于你的第一个问题,只是压缩与zipWithIndex在RDD的线条和过滤你不想要的线条。 对于第二个问题,您可以尝试从行删除第一个和最后一个双引号字符,然后拆分","上的行。

rdd = sc.textFile("myfile.csv") 
rdd.zipWithIndex(). 
    filter(lambda x: x[1] > 2). 
    map(lambda x: x[0]). 
    map(lambda x: x.strip('"').split('","')). 
    toDF(["Col1", "Col2"]) 

尽管如此,如果你正在寻找对付CSV文件星火一种标准的方式,这是更好地从databricks使用spark-csv包。

+1

Upvoted for your“though” - 另外,该包不应该与Spark 2一起使用,因为它已经集成到Spark中,这使得“虽然”更重要,我会强烈建议这样做因为这是经典的数据标准化/正则化,它不应该成为分析管道的一部分,在Spark之外这样做可以让你使用自定义工具来完成这项工作,然后拥有一个合适的文件格式,每个人都可以使用 –

0

为什么不试试pyspark.sqlDataFrameReader API?这很容易。对于这个问题,我想这条线就足够了。

df = spark.read.csv("myFile.csv") # By default, quote char is " and separator is ',' 

有了这个API,你也可以玩弄其他一些参数,比如标题行,忽略前后空格。这里是链接:DataFrameReader API

+0

这不允许我跳过 –

+0

你试过用'ignoreLeadingWhiteSpace'或'ignoreTrailingWhiteSpace'设置为True吗?我不确定它会工作,但至少,给它一试。 –

+0

并且也尝试'mode = DROPMALFORMED'。我的假设是,它会认为空行是腐败的。 –

1

如果CSV文件的结构总是有两列,在斯卡拉可以实现:

val struct = StructType(
    StructField("firstCol", StringType, nullable = true) :: 
    StructField("secondCol", StringType, nullable = true) :: Nil) 

val df = sqlContext.read 
    .format("com.databricks.spark.csv") 
    .option("header", "false") 
    .option("inferSchema", "false") 
    .option("delimiter", ",") 
    .option("quote", "\"") 
    .schema(struct) 
    .load("myFile.csv") 

df.show(false) 

val indexed = df.withColumn("index", monotonicallyIncreasingId()) 
val filtered = indexed.filter(col("index") > 2).drop("index") 

filtered.show(false) 

结果是:

+---------+---------+ 
|firstCol |secondCol| 
+---------+---------+ 
|Header |null  | 
|Blank Row|null  | 
|Col1  |Col2  | 
|1,200 |1,456 | 
|2,000 |3,450 | 
+---------+---------+ 

+--------+---------+ 
|firstCol|secondCol| 
+--------+---------+ 
|1,200 |1,456 | 
|2,000 |3,450 | 
+--------+---------+ 
+0

PySpark允许你也这样做。这将工作,如果它不是头。只有标题get被读入,其他行被跳过。 –

相关问题