2017-08-28 32 views
0

在我的用例中,我有一个包含10万条记录的配置单元表。每个记录代表一个必须处理的原始数据文件。处理每个原始数据文件会生成一个csv文件,其大小将在10MB到500MB之间变化。最终,这些CSV文件将作为单独的进程填充到HIVE表中。在我的企业集群中,仍然不建议在hdfs中生成大量的数据。因此,我更愿意将这两个单独的流程合并为一个流程,以便他们处理5000条记录的5000条记录。处理火花驱动程序中的Hive记录

我的问题: -

鉴于我RDD指的是整个蜂巢表,我怎么执行的原始数据处理步骤,每5000条记录? (类似于for循环,每次增加5000个记录)

回答

1

一种方法是使用RDD的滑动功能。你可以在apache spark的mllib包中找到它。这里是你如何使用它。 假设我们有1000个元素

val rdd = sc.parallelize(1 to 1000) 
import org.apache.spark.mllib.rdd._ 
val newRdd = RDDFunctions.fromRDD(rdd) 

// sliding by 10 (instead use 5000 or what you need) 
val rddSlidedBy10 = newRdd.sliding(10, 10) 

的RDD结果会是这样

Array(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), Array(11, 12, 13, 14, 15, 16, 17, 18, 19, 20), Array(21, 22, 23, 24, 25, 26, 27, 28, 29, 30), Array(31, 32, 33, 34, 35, 36, 37, 38, 39, 40), Array(41, 42, 43, 44, 45, 46, 47, 48, 49, 50), Array(51, 52, 53, 54, 55, 56, 57, 58, 59, 60), Array(61, 62, 63, 64, 65, 66, 67, 68, 69, 70), Array(71, 72, 73, 74, 75, 76, 77, 78, 79, 80) 

的你可以在阵列和处理原始数据的foreach到CSV

+0

感谢。看来这个方法在spark 2.xx中不幸的是,对于这个用例,我需要使用spark 1.6.2。 – Bala

+0

也有替代方法:https://stackoverflow.com/questions/43877678/spark-split-rdd-elements-into-chunks – dumitru

相关问题