我正试图在Spark中开发一个重型数学计算,无论是在时间还是内存方面(两者都达到O(n^2)
)。我发现持有Iterator
的分区对于大型微积分来说并不足够,因为它强制实例化(尽管懒惰,因为它是一个Iterator
)每行一个对象。事实上,在一个最简单的情况下,例如每行都会保存一个向量。但是它对内存都是有害的,因为我们知道对象的JVM开销和GC上的所有压力以及速度,因为我真的可以提高性能,将我的线性代数操作提高到BLAS级别3(matrix by矩阵,而不是矩阵的矢量,我坚持在这个范例中)。在一个非常简略这里就是我想要实现:Spark:断开分区迭代器以实现更好的内存管理?
while (???) { // loop over some condition, doesn't really matter what
val matrix = ??? // an instance of a matrix
val broadMatrix = sparkContext.broadcast(matrix)
// rdd is an instance of RDD[Vector] that is already cached
rdd.mapPartition {
iter =>
val matrixValue = broadMatrix.value()
iter.map (vector => matrixValue * vec)
}
// a bunch of other things relying on that result
}
这里是我的想法:
我的
rdd
在上面的代码缓存,然后有一个Iterator
也没用,是不是它?由于它唯一的优点不是同时记住所有行:但是在这里它已经被计算和缓存,所以所有的行都保存在内存中......当然可以争辩说,Spark的可能有一个智能缓存序列化和压缩数据(我怀疑当存储级别为MEMORY_ONLY
虽然...)。如果1.是真的,那么它产生的唯一东西是巨大的内存开销,因为我拥有与我的
rdd
中的行一样多的JVM对象,但是我可以将它降低到每个分区的单个JVM对象。我甚至可以将它降低到一个对象,每个对象具有一个scalaobject
,它可以作为共享内存用于同一个执行器上的所有分区(我担心这可能很难处理,因为我想保留Spark的弹性,因此如果一个分区因任何原因被删除并重新出现在另一个执行器上,我不想自己处理它,但让Spark将所有相关的对象自己移动......)。
我的想法,因此将vector
这rdd
转化为一个包含矩阵,是这样的:已经面临这个的dilemna
while (???) { // loop over some condition, doesn't really matter what
val matrix = ??? // an instance of a matrix
val broadMatrix = sparkContext.broadcast(matrix)
// rdd is an instance of RDD[Vector] that is already cached
rdd.mapPartition {
iter =>
val matrixValue = broadMatrix.value()
// iter actually contains one single element which is the matrix containing all vectors stacked
// here we have a BLAS-3 operation
iter.map (matrix => matrixValue * matrix)
}
// a bunch of other things relying on that result
}
有人吗?你有没有经历过这种内存管理的提前使用?
我可以想象,即使rdd被缓存,如果您更改为Iterator为非惰性集合,则spark会再次消耗内存(两次)。我想你应该尝试使用一些基准测试来更好地使用哪个版本 –
我想像Spark确实会消耗两次内存......我会尝试一下,看看我是否有更好的表现。我会尽快给出基准的结果! –