我有一个用例,我想迭代地将数据加载到Pandas数据框中,使用外部函数(即xgboost,在示例代码中未显示)做一些处理,然后将结果推送到单个PySpark对象(RDD或DF)。熊猫PySpark给OOM错误,而不是溢出到磁盘
我试图让PySpark在将数据存储为RDD或数据帧时再次泄漏到磁盘,而源数据是Pandas DataFrame。似乎没有什么工作,我一直崩溃的Java驱动程序,我无法加载我的数据。或者,我已经尝试加载我的数据,而不处理只使用基本的textFile RDD,它的工作就像一个魅力。我想知道这是否是一个PySpark错误,或者如果有解决方法。
示例代码:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
import pyspark
try:
SparkContext.stop(sc)
except NameError:
1
SparkContext.setSystemProperty('spark.executor.memory', '200g')
SparkContext.setSystemProperty('spark.driver.memory', '200g')
sc = SparkContext("local", "App Name")
sql_sc = SQLContext(sc)
chunk_100k = pd.read_csv("CData.csv", chunksize=100000)
empty_df = pd.read_csv("CData.csv", nrows=0)
infer_df = pd.read_csv("CData.csv", nrows=10).fillna('')
my_schema = SQLContext.createDataFrame(sql_sc, infer_df).schema
SparkDF = SQLContext.createDataFrame(sql_sc, empty_df, schema=my_schema)
for chunk in chunk_100k:
SparkDF = SparkDF.union(SQLContext.createDataFrame(sql_sc, chunk, schema=my_schema))
崩溃,经过几次反复:
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.readRDDFromFile. : java.lang.OutOfMemoryError: Java heap space
工作直接加载到RDD代码:
my_rdd = sc.textFile("CData.csv") \
.map(lambda line: line.split(",")) \
.filter(lambda line: len(line)>1) \
.map(lambda line: (line[0],line[1]))
更新:
我有改变了e代码演示加载到Spark DataFrame而不是RDD时失败,请注意问题仍然存在,并且错误消息仍然引用RDD。 上一页改变示例代码,保存到RDDS发现使用“并行化”的时候要至少有问题的,原因如下:
Why does SparkContext.parallelize use memory of the driver?
@ zero323我已经改变了上下文使用并行时从歧义问题。 –
它仍然是__exactly相同的问题___('SparkSession.createDataFrame' - >'SparkSession._createFromLocal' - >'SparkContext.parallelize')和失败的原因相同。从本地对象创建分布式数据结构并不是一件容易的事情。如果您想以可扩展的方式加载数据,请使用Spark csv阅读器。 – zero323
除了使用Spark csv阅读器以外没有别的方法吗?我想读大熊猫,而不是csv。这是将文件从熊猫写入磁盘并将其重新加载到Spark中的额外步骤。 –