0
我是新来激发使用星火1.6.1与两名工人各有内存1GB和分配5芯,运行在一个33MB文件的代码。Pyspark错误Java堆空间错误
此代码用于在火花中索引单词。
from textblob import TextBlob as tb
from textblob_aptagger import PerceptronTagger
import numpy as np
import nltk.data
import Constants
from pyspark import SparkContext,SparkConf
import nltk
TOKENIZER = nltk.data.load('tokenizers/punkt/english.pickle')
def word_tokenize(x):
return nltk.word_tokenize(x)
def pos_tag (s):
global TAGGER
return TAGGER.tag(s)
def wrap_words (pair):
''' associable each word with index '''
index = pair[0]
result = []
for word, tag in pair[1]:
word = word.lower()
result.append({ "index": index, "word": word, "tag": tag})
index += 1
return result
if __name__ == '__main__':
conf = SparkConf().setMaster(Constants.MASTER_URL).setAppName(Constants.APP_NAME)
sc = SparkContext(conf=conf)
data = sc.textFile(Constants.FILE_PATH)
sent = data.flatMap(word_tokenize).map(pos_tag).map(lambda x: x[0]).glom()
num_partition = sent.getNumPartitions()
base = list(np.cumsum(np.array(sent.map(len).collect())))
base.insert(0, 0)
base.pop()
RDD = sc.parallelize(base,num_partition)
tagged_doc = RDD.zip(sent).map(wrap_words).cache()
对于较小的文件< 25MB的代码工作正常,但给错误的文件,其尺寸较大的是25MB。
帮我解决这个问题或提供替代这个问题?
你能否提出一个替代解决方案呢? – arjun045
你会解释你在做什么,这是一个不同的问题。无论如何,如果你使用spark,collect只是为了调试,坚持rdd操作。有很多方法可以在pyspark中写出累计总和,如果你在某个地方停留一段时间后再进行一些讨论,然后再进行一些研究。 – marmouset