我在Scala中编写了一个相对简单的Spark作业,它从S3中读取一些数据,执行一些转换和聚合,并最终将结果存储到存储库中。Spark:将RDD元素拆分成块
在最后阶段,我有我的域模型的RDD,并且我想将它们分组为大块的元素,以便我可以在我的存储库中进行一些大规模插入。
我用RDDFunctions.sliding
方法来实现这一点,它工作得很好。这里是我的代码的简化版本:
val processedElements: RDD[DomainModel] = _
RDDFunctions.fromRDD(processedElements)
.sliding(500, 500)
.foreach { elementsChunk =>
Await.ready(repository.bulkInsert(elementsChunk), 1.minute)
}
问题是,如果例如我有1020个元素,最终只有1000个元素在我的存储库中。如果窗口大小大于剩余元素的数量,它看起来像滑动会忽略任何附加元素。
有什么办法可以解决这个问题吗?如果没有,有没有其他方法可以在不使用RDDFunctions.sliding
的情况下实现相同的行为?
我在开始时尝试了类似的内容。不幸的是,这导致了内存问题,因为我最终在内存中保留了很多东西。 – Alex