2015-10-17 50 views
4

我正在使用Scala中的Apache Spark。Spark关闭参数绑定

我试图用第二个RDD的数据操作一个RDD时出现问题。我试图将第二个RDD作为参数传递给第一个RDD的“映射”函数,但看起来在该函数上创建的闭包绑定了该值的未初始​​化版本。

以下是一段简单的代码,显示我看到的问题的类型。 (我第一次遇到麻烦的真实例子是更大,更不容易理解)。

我不太了解Spark闭包的参数绑定规则。

我真正需要的是如何使用另一个RDD的内容(之前在别处构建)来操作一个RDD的基本方法或模式。

在下面的代码,调用Test1.process(SC)将失败,在findSquare一个空指针访问(如在闭合结合的第二ARG未被初始化)

object Test1 { 

    def process(sc: SparkContext) { 
    val squaresMap = (1 to 10).map(n => (n, n * n)) 
    val squaresRDD = sc.parallelize(squaresMap) 

    val primes = sc.parallelize(List(2, 3, 5, 7)) 

    for (p <- primes) { 
     println("%d: %d".format(p, findSquare(p, squaresRDD))) 
    } 
    } 

    def findSquare(n: Int, squaresRDD: RDD[(Int, Int)]): Int = { 
    squaresRDD.filter(kv => kv._1 == n).first._1 
    } 
} 

回答

4

问题,你的经验无关,与闭合或RDDS其中,流行的看法相反,are serializable

它只是打破了一个基本的Spark规则,它规定你不能触发一个动作或从另一个动作或转换转换*并且这个问题的不同变体已经被多次询问。

要理解为什么这是你要想想该架构的情况:

  • SparkContext是对驾驶员管理
  • 里面的一切变革是在工人执行的情况。每个工作人员只能访问自己的部分数据,不能与其他工作人员进行交流。

如果你想使用你必须要使用的结合RDDS,像joincartesianzipunion变换的一个多RDDS的内容。

在这里,您最有可能(我不知道为什么你传递数组和使用这个元组仅第一个元素)希望用广播变量:

val squaresMapBD = sc.broadcast(squaresMap) 

def findSquare(n: Int): Seq[(Int, Int)] = { 
    squaresMapBD.value 
    .filter{case (k, v) => k == n} 
    .map{case (k, v) => (n, k)} 
    .take(1) 
} 

primes.flatMap(findSquare) 

或笛卡尔:

primes 
    .cartesian(squaresRDD) 
    .filter{case (n, (k, _)) => n == k}.map{case (n, (k, _)) => (n, k)} 

转换primes接到虚对(Int, null)join会更有效:

primes.map((_, null)).join(squaresRDD).map(...) 

但根据您的意见我假设你有一个场景,当有自然连接条件。

根据上下文,还可以考虑使用数据库或文件来存储公用数据。

在旁注中RDD不可迭代,因此您不能简单地使用for循环。为了能够做到这一点,你必须首先collect或将toLocalIterator转换成。您也可以使用foreach方法。


*确切的说,您不能访问SparkContext

** Torrent广播和树聚合涉及执行者之间的通信,所以它在技术上是可行的。

+1

好的,谢谢。我对Spark很新,并且还没有遇到你提到的规则(或者如果我在某处看到它的话,它没有打到家)。我很自然地想到在处理另一个数据集时“咨询”一个数据集,但我显然需要调整我的想法。 –

+1

感谢您的补充信息。我只有2周的时间学习Spark,所以我仍然试图让基本概念内化。广播变量可能正是我认为我想将一个arg传递给映射函数的一些情况。 –

+0

根据数据的大小,您应该按照以下顺序传递它:参数 zero323

-3

RDD不能序列化,所以你不能在rdd转换中使用rdd。 然后我从来没有见过用for语句枚举rdd,通常我使用的是foreach语句,它是rdd api的一部分。

为了将数据从两个RDD结合起来,你可以利用加入,工会或广播(如果您的RDD小)

+1

我以前见过RDD可序列化的意见。我在下面的帖子中找到了相当恰当的答案。对于如何处理一个数据集与另一个数据集操作的一般情况(他们是根本不同类型的数据并且不自然地“加入”等),我仍然在脑海中留下了一个困惑。 stackoverflow.com/questions/29567247/serializing-rdd –

+0

我恰巧正在使用旧版本的Spark(1.2),正如其他帖子的答案中提到的那样,当您尝试执行我作为SparkContext字段所做的操作时,会生成空指针异常在RDD被反序列化后没有被设置。 –

+1

RDD可序列化请参阅[docs](https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.rdd.RDD)和[source](https: //github.com/apache/spark/blob/f85aa06464a10f5d1563302fd76465dded475a12/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L77)。 – zero323