我正在使用Scala中的Apache Spark。Spark关闭参数绑定
我试图用第二个RDD的数据操作一个RDD时出现问题。我试图将第二个RDD作为参数传递给第一个RDD的“映射”函数,但看起来在该函数上创建的闭包绑定了该值的未初始化版本。
以下是一段简单的代码,显示我看到的问题的类型。 (我第一次遇到麻烦的真实例子是更大,更不容易理解)。
我不太了解Spark闭包的参数绑定规则。
我真正需要的是如何使用另一个RDD的内容(之前在别处构建)来操作一个RDD的基本方法或模式。
在下面的代码,调用Test1.process(SC)将失败,在findSquare一个空指针访问(如在闭合结合的第二ARG未被初始化)
object Test1 {
def process(sc: SparkContext) {
val squaresMap = (1 to 10).map(n => (n, n * n))
val squaresRDD = sc.parallelize(squaresMap)
val primes = sc.parallelize(List(2, 3, 5, 7))
for (p <- primes) {
println("%d: %d".format(p, findSquare(p, squaresRDD)))
}
}
def findSquare(n: Int, squaresRDD: RDD[(Int, Int)]): Int = {
squaresRDD.filter(kv => kv._1 == n).first._1
}
}
好的,谢谢。我对Spark很新,并且还没有遇到你提到的规则(或者如果我在某处看到它的话,它没有打到家)。我很自然地想到在处理另一个数据集时“咨询”一个数据集,但我显然需要调整我的想法。 –
感谢您的补充信息。我只有2周的时间学习Spark,所以我仍然试图让基本概念内化。广播变量可能正是我认为我想将一个arg传递给映射函数的一些情况。 –
根据数据的大小,您应该按照以下顺序传递它:参数
zero323