我是Apache Spark的初学者。我想过滤出所有在RDD中权重总和大于常数的组。 “体重”图也是RDD。这里是一个小尺寸的演示中,待过滤的被存储在“基团”的基团,所述恒定值是12:如何根据基于Spark中另一个RDD的函数过滤RDD?
val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
val wm = weights.toArray.toMap
def isheavy(inp: String): Boolean = {
val allw = inp.split(",").map(wm(_)).sum
allw > 12
}
val result = groups.filter(isheavy)
当输入数据是非常大的,> 10GB例如,我总是遇到“java堆内存不足”错误。我怀疑它是否是由“weights.toArray.toMap”引起的,因为它将分布式RDD转换为JVM中的Java对象。所以,我想用RDD直接进行过滤:
val groups = sc.parallelize(List("a,b,c,d", "b,c,e", "a,c,d", "e,g"))
val weights = sc.parallelize(Array(("a", 3), ("b", 2), ("c", 5), ("d", 1), ("e", 9), ("f", 4), ("g", 6)))
def isheavy(inp: String): Boolean = {
val items = inp.split(",")
val wm = items.map(x => weights.filter(_._1 == x).first._2)
wm.sum > 12
}
val result = groups.filter(isheavy)
当我加载这个脚本到火花后壳跑result.collect
,我得到了一个“显示java.lang.NullPointerException”错误。有人告诉我,在另一个RDD中操作RDD时,会出现空指针异常,并建议我将权重放入Redis。
那么我怎样才能得到“结果”没有转换“权重”地图,或把它放到Redis?如果有一种解决方案可以在没有外部数据存储服务的帮助下基于另一个类似地图的RDD过滤RDD? 谢谢!
放大'spark.executor.memory'确实有效。 – Chad 2014-09-28 03:37:30