2016-11-25 101 views
2

我已经在python中编写了一个可正常工作的Spark程序。检查RDD中是否存在值

但是,它在内存消耗方面效率低下&我正试图优化它。我在AWS EMR上运行它,EMR正在消耗太多内存的工作。

Lost executor 11 on ip-*****: Container killed by YARN for exceeding memory limits. 11.4 GB of 10.4 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 

我相信这个内存问题是由于这样的事实,我在几种情况下收集我的RDDS((),即使用.collect),因为在后期阶段,我需要测试,如果在列表中存在一定的价值是否由这些RDD制成。

所以,目前我的代码看起来像这样:

myrdd = data.map(lambda word: (word,1))  \ 
     .reduceByKey(lambda a,b: a+b) \ 
     .filter(lambda (a, b): b >= 5) \ 
     .map(lambda (a,b) : a)   \ 
     .collect() 

,并在代码

if word in myrdd: 
    mylist.append(word) 

myrdd2 = data2.map(lambda word: (word,1))  \ 
     .reduceByKey(lambda a,b: a+b) \ 
     .filter(lambda (a, b): b >= 5) \ 
     .map(lambda (a,b) : a)   \ 
     .collect() 

if word in myrdd2: 
    mylist2.append(word) 

,然后我重复这种模式多次晚些时候。

有没有办法做到操作

if word in myrdd: 
    do something 

不先收集RDD?

是否有像rdd.contains()这样的函数?

P.S:我没有在内存中缓存任何东西。我的火花背景是这样的:从纱线

jobName = "wordcount" 
sc = SparkContext(appName = jobName) 

...... 
...... 

sc.stop() 
+2

不使用.collect ()它会把所有的数据传给驱动程序,如果你有更大的数据集,会产生一个问题。使用myrdd2.foreachRDD并检查值是否存在 – Backtrack

+0

word = sc.broadcast([w1,w2,w3]) valuepresent = myrdd.filter {lambda x:x in word}这样做也是一种解决方法,我会认为 – Backtrack

回答

1

错误消息指出collect不是一个问题,因为你的执行者(而不是驱动器)有记忆问题。

首先,尝试按照错误消息建议并提升spark.yarn.executor.memoryOverhead - 在YARN上运行pyspark时,您可以告诉YARN为python工作进程内存分配更大的容器。

接下来,看看执行者需要大量内存的操作。你使用reduceByKey,也许你可以增加分区的数量,使它们在内存使用方面更小。看看numPartitions参数:http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.RDD.reduceByKey

最后,如果你想检查是否RDD包含了一些值,那么仅仅通过这个值过滤和检查,使用countfirst,例如:

looking_for = "....." 
contains = rdd.filter(lambda a: a == looking_for).count() > 0 
+0

谢谢。有很多RDD会对执行者造成压力吗?例如。如果我做了像myrddalias = myrdd之类的东西,它是否会给内存带来额外的压力,或者那样好吗? – Piyush

+1

它只是复制引用,rdds本身不会被克隆 – Mariusz

+0

问题是,looking_for是一个RDD,当我在另一个RDD上执行过滤器时,它向我显示一个错误,说我不能将一个转换放在另一个RDD中。 Looking_for是一个列表,我希望根据存在于look_for rdd中的某个值来修剪我的rdd。确切的错误 - 例外:看起来您正试图广播RDD或引用操作或转换中的RDD。 RDD转换和操作只能由驱动程序调用,而不能在其他转换中调用; – Piyush