我已经在python中编写了一个可正常工作的Spark程序。检查RDD中是否存在值
但是,它在内存消耗方面效率低下&我正试图优化它。我在AWS EMR上运行它,EMR正在消耗太多内存的工作。
Lost executor 11 on ip-*****: Container killed by YARN for exceeding memory limits. 11.4 GB of 10.4 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
我相信这个内存问题是由于这样的事实,我在几种情况下收集我的RDDS((),即使用.collect),因为在后期阶段,我需要测试,如果在列表中存在一定的价值是否由这些RDD制成。
所以,目前我的代码看起来像这样:
myrdd = data.map(lambda word: (word,1)) \
.reduceByKey(lambda a,b: a+b) \
.filter(lambda (a, b): b >= 5) \
.map(lambda (a,b) : a) \
.collect()
,并在代码
if word in myrdd:
mylist.append(word)
myrdd2 = data2.map(lambda word: (word,1)) \
.reduceByKey(lambda a,b: a+b) \
.filter(lambda (a, b): b >= 5) \
.map(lambda (a,b) : a) \
.collect()
if word in myrdd2:
mylist2.append(word)
,然后我重复这种模式多次晚些时候。
有没有办法做到操作
if word in myrdd:
do something
不先收集RDD?
是否有像rdd.contains()这样的函数?
P.S:我没有在内存中缓存任何东西。我的火花背景是这样的:从纱线
jobName = "wordcount"
sc = SparkContext(appName = jobName)
......
......
sc.stop()
不使用.collect ()它会把所有的数据传给驱动程序,如果你有更大的数据集,会产生一个问题。使用myrdd2.foreachRDD并检查值是否存在 – Backtrack
word = sc.broadcast([w1,w2,w3]) valuepresent = myrdd.filter {lambda x:x in word}这样做也是一种解决方法,我会认为 – Backtrack