2017-05-09 107 views
0

我在Scala中编写了一个相对简单的Spark作业,它从S3中读取一些数据,执行一些转换和聚合,并最终将结果存储到存储库中。Spark:将RDD元素拆分成块

在最后阶段,我有我的域模型的RDD,并且我想将它们分组为大块的元素,以便我可以在我的存储库中进行一些大规模插入。

我用RDDFunctions.sliding方法来实现这一点,它工作得很好。这里是我的代码的简化版本:

val processedElements: RDD[DomainModel] = _ 
RDDFunctions.fromRDD(processedElements) 
    .sliding(500, 500) 
    .foreach { elementsChunk => 
     Await.ready(repository.bulkInsert(elementsChunk), 1.minute) 
    } 

问题是,如果例如我有1020个元素,最终只有1000个元素在我的存储库中。如果窗口大小大于剩余元素的数量,它看起来像滑动会忽略任何附加元素。

有什么办法可以解决这个问题吗?如果没有,有没有其他方法可以在不使用RDDFunctions.sliding的情况下实现相同的行为?

回答

0

难道你不能只使用foreachPartition和手动批次管理?

fromRDD.foreachPartition(items: Iterator[DomainModel] => { 
    val batch = new ArrayBuffer[DomainModel](BATCH_SIZE) 
    while (items.hasNext) { 
    if (batch.size >= BATCH_SIZE) { 
     bulkInsert(batch) 
     batch.clear() 
    } 
    batch += items.next 
    } 
    if (!batch.isEmpty) { 
     bulkInsert(batch) 
    } 
}) 
+0

我在开始时尝试了类似的内容。不幸的是,这导致了内存问题,因为我最终在内存中保留了很多东西。 – Alex

0

你说得对,星火的sliding(不像Scala的),会如果窗口大小超过其余项目的数量,根据RDDFunctions doc产生一个空RDD。 Spark也没有与Scala的grouped等价。

如果您知道要创建多少个群组,则可能适用的解决方法是将RDD与modulo过滤器分开。下面是将RDD分成5组的简单示例:

val rdd = sc.parallelize(Seq(
    (0, "text0"), (1, "text1"), (2, "text2"), (3, "text2"), (4, "text2"), (5, "text5"), 
    (6, "text6"), (7, "text7"), (8, "text8"), (9, "text9"), (10, "text10"), (11, "text11") 
)) 

def g(n:Int)(x: Int): Boolean = { x % 5 == n } 

val rddList = (0 to 4).map(n => rdd.filter(x => g(n)(x._1))) 

(0 to 4).foreach(n => rddList(n).collect.foreach(println)) 

(0,text0) 
(5,text5) 
(10,text10) 

(1,text1) 
(6,text6) 
(11,text11) 

(2,text2) 
(7,text7) 

(3,text2) 
(8,text8) 

(4,text2) 
(9,text9) 
+0

不幸的是我不知道会有多少组。投入在不断变化,所以每次我们可能会得到不同数量的团体。 – Alex