2015-10-22 53 views
3

我有一个数据集,我正在从Spark(例如Parquet文件)访问,其中包含大量的行。我需要将这些行中的一些数据发送到外部服务,并且我需要对它们进行批处理,以便每次对外部服务的调用都包含一定数量的行(例如,每批1000行)。基本上什么take(n)正在做什么,但反复,通过一个大型数据集重复执行。什么是执行此类任务的好方法?我想这可以用foreach()完成,并手动批量聚合数据,但我不知道是否有任何内置/推荐的方式这样做。Spark的迭代take()或批处理?

+2

我不会为这类任务发火花。你确定你不能把这个功能移到别的地方,就像在一个普通的hadoop M/R应用程序中一样。 – davidshen84

回答

2

当你创建一个拼花数据帧文件,它被基于分区的HDFS块位置。

因此,第一个问题是问你是否可以将数据集并行写入外部服务。即同时从多个服务器发送1000行批量。

如果这是好的,那么最有效的方法是foreachPartition函数。喜欢的东西:

df.rdd.forEachPartition { it => 
    it.grouped(1000).foreach(sendBatch) 
} 

如果外部服务无法使用这种方式,则第二个最好的办法是toLocalIterator

df.rdd.toLocalIterator { it => 
    it.grouped(1000).foreach(sendBatch) 
} 

请注意,此解决方案是显著效率较低,因为它会序列每个分区和从执行者转移给司机。

+0

如何在java中实现同样的事情? –

3

我不知道任何内置或推荐的选项,但简单的解决方案是结合RDD API和Scala Iterable API。如果你申请的操作是幂等,你可以从工人直接做到这一点:

val batchSize: Int = ??? 
val rdd: RDD[T] = ??? 
def doSomething(xs: Seq[T]) = ??? 

rdd.foreachPartition(_.grouped(batchSize).foreach(doSomething)) 

否则,你可以在每次向驱动程序获取单个分区:

rdd.cache 
rdd.toLocalIterator.grouped(batchSize).foreach(doSomething) 

请注意,它需要一个单独的作业对于每个分区,因此先缓存输入rdd以避免重新计算是一个好主意。

在Python中,你可以使用toolz库作为替换Iterator API的:

from toolz.itertoolz import partition_all 

rdd.foreachPartition(
    lambda iter: [doSomething(xs) for xs in partition_all(batchSize, iter)]) 

for xs in partition_all(batchSize, rdd.toLocalIterator()): 
    doSomething(xs) 
+0

如果RDD只是由实木复合地板文件支持的DataFrame,我认为缓存不会有帮助。事实上,如果文件很大,它很可能会减慢速度或者根本不工作。 – kostya