我创建了一个spark工作,每天从我的hdfs读入一个文本文件,并从文本文件的每一行中提取唯一键。每个文本文件中大约有50000个键。相同的数据然后通过提取的密钥进行过滤并保存到hdfs。使用Spark多次写入hadoop分布式文件系统
我想在我的hdfs中创建一个目录,其结构为:hdfs://.../date/key,它包含已过滤的数据。问题在于写入hdfs需要很长的时间,因为密钥太多了。
这是写现在的方式:
val inputData = sparkContext.textFile(""hdfs://...", 2)
val keys = extractKey(inputData) //keys is an array of approx 50000 unique strings
val cleanedData = cleanData(inputData) //cleaned data is an RDD of strings
keys.map(key => {
val filteredData = cleanedData.filter(line => line.contains(key))
filteredData.repartition(1).saveAsTextFile("hdfs://.../date/key")
})
有没有办法让这个更快?我想过将数据重新分区为提取的键的数量,但是我无法以格式hdfs://.../date/key保存。我也试过groupByKey,但是我不能保存这些值,因为它们不是RDD。
任何帮助表示赞赏:)
这个问题是重复的http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job – samthebest
我正在寻找一种解决方案,使用* * saveAsTextFile **而不是saveAsHadoopFile,并将它们保存到单独的目录中,而不仅仅是具有不同名称的不同文件。我已经实施了您所链接的解决方案。但具体而言,我想知道是否有更快的方式来创建多个目录。 – akinos
如果我有50,000个密钥,并且我需要创建50,000个分区来映射每个密钥,那么链接到的解决方案仍然很慢。 – akinos