2014-07-01 51 views
1

我创建了一个spark工作,每天从我的hdfs读入一个文本文件,并从文本文件的每一行中提取唯一键。每个文本文件中大约有50000个键。相同的数据然后通过提取的密钥进行过滤并保存到hdfs。使用Spark多次写入hadoop分布式文件系统

我想在我的hdfs中创建一个目录,其结构为:hdfs://.../date/key,它包含已过滤的数据。问题在于写入hdfs需要很长的时间,因为密钥太多了。

这是写现在的方式:

val inputData = sparkContext.textFile(""hdfs://...", 2) 
val keys = extractKey(inputData) //keys is an array of approx 50000 unique strings 
val cleanedData = cleanData(inputData) //cleaned data is an RDD of strings 
keys.map(key => { 
    val filteredData = cleanedData.filter(line => line.contains(key)) 
    filteredData.repartition(1).saveAsTextFile("hdfs://.../date/key") 
}) 

有没有办法让这个更快?我想过将数据重新分区为提取的键的数量,但是我无法以格式hdfs://.../date/key保存。我也试过groupByKey,但是我不能保存这些值,因为它们不是RDD。

任何帮助表示赞赏:)

+0

这个问题是重复的http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job – samthebest

+0

我正在寻找一种解决方案,使用* * saveAsTextFile **而不是saveAsHadoopFile,并将它们保存到单独的目录中,而不仅仅是具有不同名称的不同文件。我已经实施了您所链接的解决方案。但具体而言,我想知道是否有更快的方式来创建多个目录。 – akinos

+0

如果我有50,000个密钥,并且我需要创建50,000个分区来映射每个密钥,那么链接到的解决方案仍然很慢。 – akinos

回答

0
def writeLines(iterator: Iterator[(String, String)]) = { 
    val writers = new mutalbe.HashMap[String, BufferedWriter] // (key, writer) map 
    try { 
    while (iterator.hasNext) { 
    val item = iterator.next() 
    val key = item._1 
    val line = item._2 
    val writer = writers.get(key) match { 
     case Some(writer) => writer 
     case None => 
     val path = arg(1) + key 
     val outputStream = FileSystem.get(new Configuration()).create(new Path(path)) 
     writer = new BufferedWriter(outputStream) 
    } 
    writer.writeLine(line) 
    } finally { 
    writers.values.foreach(._close()) 
    } 
} 

val inputData = sc.textFile()  
val keyValue = inputData.map(line => (key, line)) 
val partitions = keyValue.partitionBy(new MyPartition(10))  
partitions.foreachPartition(writeLines) 


class MyPartitioner(partitions: Int) extends Partitioner { 
    override def numPartitions: Int = partitions 

    override def getPartition(key: Any): Int = { 
     // make sure lines with the same key in the same partition 
     (key.toString.hashCode & Integer.MAX_VALUE) % numPartitions 
    } 
} 
0

我认为这种方法应该是类似Write to multiple outputs by key Spark - one Spark job。分区号码与目录号码无关。要实现它,您可能需要用您的自定义版本覆盖generateFileNameForKeyValue以保存到不同的目录。

关于可扩展性,它不是火花的问题,而是hdfs。但无论您如何实施,只要不改变要求,这是不可避免的。但我认为Hdfs可能有50,000个文件处理程序

0

您正在为输入指定2个分区,为输出指定1个分区。这样做的一个影响是严重限制了这些操作的并行性。为什么需要这些?

而不是计算50,000个过滤的RDD,这真的很慢,那么直接按键分组怎么样?我知道你想将它们输出到不同的目录中,但这确实造成了瓶颈。有没有另外一种方法来设计这个只是让你阅读(键,值)结果?