2017-07-17 23 views
0

我有以下目录结构:处理多个文件分别在星星之火,提交工作

/数据/ MODELA

/数据/ modelB

/数据/ modelC ..

每这些文件的数据格式(编号,分数),我必须为他们分别做以下 -

1)按分数排序和排序分数的降序(DF_1:得分,计数)

2)从DF_1计算累积频率为每个分类后的组分数(DF_2的:从DF_2得分,计数,cumFreq)

3)选择位于累积频率5-10之间:从DF_3(DF_3得分,cumFreq)

4)选择最低分数(DF_4:从文件得分)

5)选择具有得分比DF_4得分更大的所有ID和保存

我可以通过阅读将该目录作为整体文件文件并为所有模型创建公共数据框,然后使用模型上的组。

我想做的事 -

val scores_file = sc.wholeTextFiles("/data/*/") 
val scores = scores_file.map{ line => 
    //step 1 
    //step 2 
    //step 3 
    //step 4 
    //step 5 : save as line._1 
} 

这将有助于分别处理每个文件,并避免小组。

回答

0

假设你的模型是离散值,然后你知道你可以定义所有的模型转换成一个列表

val model = List("modelA", "modelB", "modelC", ...) 

你可以有以下方法:

model.forEach(model => { 
    val scoresPerModel = sc.textFile(model); 
    scoresPerModel.map { line => 
    // business logic here 
    } 
}) 

如果你不”在计算使用Hadoop文件系统API必须读取的业务逻辑并从中提取模型之前,请先了解模型。

private val fs = { 
    val conf = new org.apache.hadoop.conf.Configuration() 
    FileSystem.get(conf) 
    } 
fs.listFiles(new Path(hdfsPath))