2016-12-07 36 views
1

我需要连接2个太大而无法加载并加入单个处理的RDD。所以我从源RDD和目标RDD获取一些记录并迭代地加入它们。但随着时间的推移,我发现加入速度变得越来越慢,最后程序停在某个阶段。舞台的状态正在升级,从未改变过。似乎迭代中声明的RDD没有被释放,因此系统没有足够的内存用于新的RDD。如何解决它?用RDD迭代处理不会持续很长时间

var A: RDD[(Int, UUID)] = … 
var B: RDD[(Int, UUID)] = … 
for (i <- 0 until 64) { 
    var tmpA = A.filter(x => x._1%64 == i) 
    var tmpB = B.filter(x => x._1%64 == i) 
    var C = A.join(B) 
    println(C.count) 
} 
+0

我在想,如果你可以使用另一种方法。把你的RDD分成更小的分区然后执行连接怎么样? –

+0

如果我直接加入A和B,C太大了。增加并行性毫无用处。 – user1803467

+0

'A'和'B'有多大?当你试图一次“加入”他们会发生什么?你会得到什么错误? – maasg

回答

2

我相信你的问题至少有一部分是原始的RDD不能离开内存。在每一步中,你基本上都会将它们加载到内存中,以便完成过滤器。 相反,您可以将它们拆分为多个RDDS并将其保存到磁盘。然后在加入时仅加载相关对。这意味着通过下一个连接它可以释放其他所有内容。

也就是说,假设您的示例代表您的实际代码,似乎您的RDD有一个架构。我会尝试使用数据集,甚至更好的数据帧,这些数据帧占用较少的内存(根据我的经验,这很容易成为10倍),然后一切都可能适合内存。

+0

非常感谢,我会尝试你的两个建议。 – user1803467