2016-12-09 30 views
5

我有一个嵌套的NDJ(新行分隔的JSON)文件,我需要读入一个spark数据框并保存到实木复合地板。在试图渲染架构我用这个函数:将庞大的JSON文件读入Spark Dataframe

def flattenSchema(schema: StructType, prefix: String = null) : Array[Column] = { 
     schema.fields.flatMap(f => { 
      val colName = if (prefix == null) f.name else (prefix + "." + f.name) 
      f.dataType match { 
      case st: StructType => flattenSchema(st, colName) 
      case _ => Array(col(colName)) 
      } 
     }) 
    } 

上是由

val df = sqlCtx.read.json(sparkContext.wholeTextFiles(path).values)

读返回数据帧我也切换这val df = spark.read.json(path)所以,这只是作品与NDJs,而不是多行JSON - 同样的错误。

这是对工人 java.lang.OutOfMemoryError: Java heap space引起内存不足的错误。

我已经改变了JVM的内存选项和火花执行人/驱动器选项无济于事

有没有办法来流的文件,扁平化架构,并添加到数据帧递增? JSON的某些行包含来自前面条目的新字段......所以稍后需要填写这些字段。

回答

0

您可以通过多种方式实现此目的。

首先在阅读时,您可以提供数据框的架构来读取json,或者您可以允许spark自己推断架构。

一旦json在数据框中,您可以按照以下方法将其平坦化。

a。在数据框上使用explode() - 使其变平。 b。使用spark sql并使用访问嵌套字段。运营商。您可以找到示例here

最后,如果要将新列添加到数据帧 a。第一个选项,使用withColumn()是一种方法。但是,这将针对每个添加的新列和整个数据集进行。 b。使用sql从现有生成新的数据框 - 这可能是最简单的 c。最后,使用地图,然后访问元素,让老模式,增加新的价值,创造新模式,并最终得到了新的DF - 如下

一个withColumn将在整个RDD工作。因此,对于要添加的每个列使用该方法通常不是一个好习惯。有一种方法可以处理地图函数中的列及其数据。由于一个映射函数在这里完成这项工作,所以添加新列及其数据的代码将并行完成。

a。您可以根据计算收集新值。

b。添加这些新的列值到主RDD如下

val newColumns: Seq[Any] = Seq(newcol1,newcol2) 
Row.fromSeq(row.toSeq.init ++ newColumns) 

这里行,是行的地图方法

c中的参考。如下创建新模式

val newColumnsStructType = StructType{Seq(new StructField("newcolName1",IntegerType),new StructField("newColName2", IntegerType)) 

d。添加到旧模式

val newSchema = StructType(mainDataFrame.schema.init ++ newColumnsStructType) 

e。用新的列创建新的数据帧

val newDataFrame = sqlContext.createDataFrame(newRDD, newSchema) 
+0

它如何解决由'wholeTextFiles'产生的'java.lang.OutOfMemoryError'? –

+0

我正在处理“是否有一种方法可以对文件进行流式处理,将模式平滑并逐步添加到数据框中?JSON的某些行包含来自前面条目的新字段......所以稍后需要填写这些字段。 ”。我没有看到关于内存问题解决的问题。所以给了他多种方法。 – Ramzy

+0

如果NDJ是JSONL,那么OP不应该使用wholeTextFiles。如果不是这不会有帮助。 –

2

没有工作。这个问题与JVM对象限制有关。我最终使用了一个scala json解析器并手动构建了数据框。