我有一个数据集,我正在从Spark(例如Parquet文件)访问,其中包含大量的行。我需要将这些行中的一些数据发送到外部服务,并且我需要对它们进行批处理,以便每次对外部服务的调用都包含一定数量的行(例如,每批1000行)。基本上什么take(n)
正在做什么,但反复,通过一个大型数据集重复执行。什么是执行此类任务的好方法?我想这可以用foreach()
完成,并手动批量聚合数据,但我不知道是否有任何内置/推荐的方式这样做。Spark的迭代take()或批处理?
回答
当你创建一个拼花数据帧文件,它被基于分区的HDFS块位置。
因此,第一个问题是问你是否可以将数据集并行写入外部服务。即同时从多个服务器发送1000行批量。
如果这是好的,那么最有效的方法是foreachPartition
函数。喜欢的东西:
df.rdd.forEachPartition { it =>
it.grouped(1000).foreach(sendBatch)
}
如果外部服务无法使用这种方式,则第二个最好的办法是toLocalIterator
:
df.rdd.toLocalIterator { it =>
it.grouped(1000).foreach(sendBatch)
}
请注意,此解决方案是显著效率较低,因为它会序列每个分区和从执行者转移给司机。
如何在java中实现同样的事情? –
我不知道任何内置或推荐的选项,但简单的解决方案是结合RDD API和Scala Iterable
API。如果你申请的操作是幂等,你可以从工人直接做到这一点:
val batchSize: Int = ???
val rdd: RDD[T] = ???
def doSomething(xs: Seq[T]) = ???
rdd.foreachPartition(_.grouped(batchSize).foreach(doSomething))
否则,你可以在每次向驱动程序获取单个分区:
rdd.cache
rdd.toLocalIterator.grouped(batchSize).foreach(doSomething)
请注意,它需要一个单独的作业对于每个分区,因此先缓存输入rdd以避免重新计算是一个好主意。
在Python中,你可以使用toolz
库作为替换Iterator
API的:
from toolz.itertoolz import partition_all
rdd.foreachPartition(
lambda iter: [doSomething(xs) for xs in partition_all(batchSize, iter)])
或
for xs in partition_all(batchSize, rdd.toLocalIterator()):
doSomething(xs)
如果RDD只是由实木复合地板文件支持的DataFrame,我认为缓存不会有帮助。事实上,如果文件很大,它很可能会减慢速度或者根本不工作。 – kostya
- 1. Spark Spark待处理批处理
- 2. 迭代批处理文件夹数组
- 3. 迭代目录与批处理
- 4. Facebook Graph API批处理 - 迭代分页
- 5. 使用cURL和批处理线迭代
- 6. DOS批处理命令文件 - 迭代处理
- 7. Spark - Take并减去
- 8. Skip/Take with Spark SQL
- 9. Spark JavaPairRDD迭代
- 10. PHP:mongoDB - 迭代处理
- 11. 无缝处理迭代和非迭代
- 12. 用于kafka主题后处理的spark-streaming批处理间隔
- 13. Dataflow中的迭代处理
- 14. MSBuild批处理迭代器在同一迭代中有所不同
- 15. 批处理文件摆脱输出中的迭代器编号
- 16. 批处理函数参数和函数内部的迭代
- 17. Spark Streaming - 批处理间隔与处理时间
- 18. MapReduce或批处理作业?
- 19. PowerShell或批处理EXITCODE 7zip
- 20. SVCUtil或WSDL批处理
- 21. 迭代和处理ArrayList
- 22. Windows批处理脚本 - 文件迭代问题
- 23. Windows批处理:同时迭代多个文件行
- 24. 在迭代文本文件上运行批处理命令
- 25. 在批处理文件中对数组进行迭代
- 26. 迭代文件夹名称批处理文件
- 27. 如何复制文件。迭代空白目录 - 批处理
- 28. 批处理文件读取文件内容和迭代
- 29. 使用批处理文件对PATH进行迭代
- 30. 在Spring批处理中创建迭代流程步骤
我不会为这类任务发火花。你确定你不能把这个功能移到别的地方,就像在一个普通的hadoop M/R应用程序中一样。 – davidshen84