2017-09-18 63 views
0

我有一个用例,我想迭代地将数据加载到Pandas数据框中,使用外部函数(即xgboost,在示例代码中未显示)做一些处理,然后将结果推送到单个PySpark对象(RDD或DF)。熊猫PySpark给OOM错误,而不是溢出到磁盘

我试图让PySpark在将数据存储为RDD或数据帧时再次泄漏到磁盘,而源数据是Pandas DataFrame。似乎没有什么工作,我一直崩溃的Java驱动程序,我无法加载我的数据。或者,我已经尝试加载我的数据,而不处理只使用基本的textFile RDD,它的工作就像一个魅力。我想知道这是否是一个PySpark错误,或者如果有解决方法。

示例代码:

from pyspark import SparkContext 
from pyspark.sql import SQLContext 
import pandas as pd 
import pyspark 

try: 
    SparkContext.stop(sc) 
except NameError: 
    1 

SparkContext.setSystemProperty('spark.executor.memory', '200g') 
SparkContext.setSystemProperty('spark.driver.memory', '200g') 
sc = SparkContext("local", "App Name") 
sql_sc = SQLContext(sc) 

chunk_100k = pd.read_csv("CData.csv", chunksize=100000) 
empty_df = pd.read_csv("CData.csv", nrows=0) 
infer_df = pd.read_csv("CData.csv", nrows=10).fillna('') 
my_schema = SQLContext.createDataFrame(sql_sc, infer_df).schema 

SparkDF = SQLContext.createDataFrame(sql_sc, empty_df, schema=my_schema) 

for chunk in chunk_100k: 
    SparkDF = SparkDF.union(SQLContext.createDataFrame(sql_sc, chunk, schema=my_schema)) 

崩溃,经过几次反复:

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.readRDDFromFile. : java.lang.OutOfMemoryError: Java heap space

工作直接加载到RDD代码:

my_rdd = sc.textFile("CData.csv") \ 
.map(lambda line: line.split(",")) \ 
.filter(lambda line: len(line)>1) \ 
.map(lambda line: (line[0],line[1])) 

更新:

我有改变了e代码演示加载到Spark DataFrame而不是RDD时失败,请注意问题仍然存在,并且错误消息仍然引用RDD。 上一页改变示例代码,保存到RDDS发现使用“并行化”的时候要至少有问题的,原因如下:

Why does SparkContext.parallelize use memory of the driver?

+0

@ zero323我已经改变了上下文使用并行时从歧义问题。 –

+1

它仍然是__exactly相同的问题___('SparkSession.createDataFrame' - >'SparkSession._createFromLocal' - >'SparkContext.parallelize')和失败的原因相同。从本地对象创建分布式数据结构并不是一件容易的事情。如果您想以可扩展的方式加载数据,请使用Spark csv阅读器。 – zero323

+0

除了使用Spark csv阅读器以外没有别的方法吗?我想读大熊猫,而不是csv。这是将文件从熊猫写入磁盘并将其重新加载到Spark中的额外步骤。 –

回答

-1

在文件中apache-创建火花defaults.conf文件火花/ 1.5.1/libexec目录/ conf目录/和下面的行添加到它: spark.driver.memory 45G spark.driver.maxResultSize 10G