2017-08-10 22 views
3

我正试图在Spark中开发一个重型数学计算,无论是在时间还是内存方面(两者都达到O(n^2))。我发现持有Iterator的分区对于大型微积分来说并不足够,因为它强制实例化(尽管懒惰,因为它是一个Iterator)每行一个对象。事实上,在一个最简单的情况下,例如每行都会保存一个向量。但是它对内存都是有害的,因为我们知道对象的JVM开销和GC上的所有压力以及速度,因为我真的可以提高性能,将我的线性代数操作提高到BLAS级别3(matrix by矩阵,而不是矩阵的矢量,我坚持在这个范例中)。在一个非常简略这里就是我想要实现:Spark:断开分区迭代器以实现更好的内存管理?

while (???) { // loop over some condition, doesn't really matter what 
    val matrix = ??? // an instance of a matrix 
    val broadMatrix = sparkContext.broadcast(matrix) 
    // rdd is an instance of RDD[Vector] that is already cached 
    rdd.mapPartition { 
     iter => 
      val matrixValue = broadMatrix.value() 
      iter.map (vector => matrixValue * vec) 
    } 
    // a bunch of other things relying on that result 
} 

这里是我的想法:

  1. 我的rdd在上面的代码缓存,然后有一个Iterator也没用,是不是它?由于它唯一的优点不是同时记住所有行:但是在这里它已经被计算和缓存,所以所有的行都保存在内存中......当然可以争辩说,Spark的可能有一个智能缓存序列化和压缩数据(我怀疑当存储级别为MEMORY_ONLY虽然...)。

  2. 如果1.是真的,那么它产生的唯一东西是巨大的内存开销,因为我拥有与我的rdd中的行一样多的JVM对象,但是我可以将它降低到每个分区的单个JVM对象。我甚至可以将它降低到一个对象,每个对象具有一个scala object,它可以作为共享内存用于同一个执行器上的所有分区(我担心这可能很难处理,因为我想保留Spark的弹性,因此如果一个分区因任何原因被删除并重新出现在另一个执行器上,我不想自己处理它,但让Spark将所有相关的对象自己移动......)。

我的想法,因此将vectorrdd转化为一个包含矩阵,是这样的:已经面临这个的dilemna

while (???) { // loop over some condition, doesn't really matter what 
    val matrix = ??? // an instance of a matrix 
    val broadMatrix = sparkContext.broadcast(matrix) 
    // rdd is an instance of RDD[Vector] that is already cached 
    rdd.mapPartition { 
     iter => 
      val matrixValue = broadMatrix.value() 
      // iter actually contains one single element which is the matrix containing all vectors stacked 
      // here we have a BLAS-3 operation 
      iter.map (matrix => matrixValue * matrix) 
    } 
    // a bunch of other things relying on that result 
} 

有人吗?你有没有经历过这种内存管理的提前使用?

+0

我可以想象,即使rdd被缓存,如果您更改为Iterator为非惰性集合,则spark会再次消耗内存(两次)。我想你应该尝试使用一些基准测试来更好地使用哪个版本 –

+0

我想像Spark确实会消耗两次内存......我会尝试一下,看看我是否有更好的表现。我会尽快给出基准的结果! –

回答

1

因为我真的可以提高性能,改善我的线性代数运算直到BLAS级别3(矩阵通过矩阵,而不是通过我在此范例中卡住的向量的矩阵)。

使用Iterators不会强迫你以任何方式使用Vectors,或者为每个分区甚至超过一个元素。如果需要,您可以轻松地为每个分割创建一个单独的Matrix对象。

有害的内存,因为我们知道在JVM开销对象和被放在GC

我要说,这是比这更复杂的所有压力。使用Iterators的原因是能够处理大于内存的分区。使用懒惰Iterators和小对象Spark可以将部分结果泄漏到磁盘并​​使其可用于垃圾收集。这在使用单个大对象时不会发生。根据我的经验,Spark对于大型物体的GC问题更容易受到影响。

基于描述,我怀疑它应该是有意义的,以避免显式存储数据,而是使用堆内存明确地初始化对象。这应该保持GC在海湾,并允许您处理大型物体。但是可以付出等级的方式。