2015-07-21 90 views
2

我有一个字符串的一大RDD懒惰的foreach(通过几种sc.textFile(...))工会获得。在星火RDD

我现在想搜索该RDD给定的字符串,我想搜索时停止一个“足够好”的比赛已经找到。

我可以改造foreach,或filter,或map用于此目的,但所有这些都将通过每一个元素在RDD迭代,不管比赛是否已经达到。

有没有办法将这个过程短路并避免id遍历整个RDD?

回答

2

我可以改造的foreach,或过滤器,或映射用于此目的,但所有这些都将通过每一个元素在RDD

其实,你错了迭代。星火引擎是足够聪明的优化计算,如果你限制的结果(利用takefirst):

import numpy as np 
from __future__ import print_function 

np.random.seed(323) 

acc = sc.accumulator(0) 

def good_enough(x, threshold=7000): 
    global acc 
    acc += 1 
    return x > threshold 

rdd = sc.parallelize(np.random.randint(0, 10000) for i in xrange(1000000)) 

x = rdd.filter(good_enough).first() 

现在,让我们检查ACCUM:

>>> print("Checked {0} items, found {1}".format(acc.value, x)) 
Checked 6 items, found 7109 

,只是可以肯定,如果一切正常:

acc = sc.accumulator(0) 
rdd.filter(lambda x: good_enough(x, 100000)).take(1) 
assert acc.value == rdd.count() 

可能会做同样的事情,可能会以更有效的方式使用数据框和udf。

注意:在某些情况下,甚至可以在Spark中使用无限序列并仍然可以得到结果。例如,您可以查看我的答案Spark FlatMap function for huge lists

0

不是。没有find方法,就像启发Spark API的Scala集合一样,一旦找到满足谓词的元素就会停止查找。大概你最好的选择是使用一个数据源,它可以最大限度地减少多余的扫描,比如Cassandra,驱动程序会在这里下推一些查询参数。你也可以看看更实验的名为BlinkDB的伯克利项目。底线,Spark更多地用于扫描数据集,比如之前的MapReduce,而不是传统的类似于数据库的查询。

+0

据我所知,RDD实际上更像Scala懒惰集合。请检查[我的回答](http://stackoverflow.com/a/31544650/1560062)并让我知道你是否有任何意见。 – zero323