2016-12-06 41 views
0

您好我正在使用scala来识别行的第一个字并创建一个唯一值并将其附加到RDD中。但我不知道该怎么做。我是斯卡拉新手,所以请原谅,如果这个问题听起来跛脚。 我正在尝试的示例如下。如何在RDD中添加唯一值火花

样品:

OBR|1|METABOLIC PANEL 
OBX|1|Glucose 
OBX|2|BUN 
OBX|3|CREATININE 
OBR|2|RFLX TO VERIFICATION 
OBX|1|EGFR 
OBX|2|SODIUM 
OBR|3|AMBIGUOUS DEFAULT 
OBX|1|POTASSIUM 

我要检查,如果第一个字是OBR与否,如果它是OBR比我创建了一个独特的价值,并希望将其追加在OBR和下方OBX直到我发现了一个OBR,我想这样做。但是我怎么能做到这一点?我正在将我的数据从HDFS

预期结果:

OBR|1|METABOLIC PANEL|OBR_filename_1 
OBX|1|Glucose|OBR_filename_1 
OBX|2|BUN|OBR_filename_1 
OBX|3|CREATININE|OBR_filename_1 
OBR|2|RFLX TO VERIFICATION|OBR_filename_2 
OBX|1|EGFR|OBR_filename_2 
OBX|2|SODIUM|OBR_filename_2 
OBR|3|AMBIGUOUS DEFAULT|OBR_filename_3 
OBX|1|POTASSIUM|OBR_filename_3 
+0

在分布式系统(如spark和hdfs)中,没有像按顺序读取文件那样的东西。如果所有的OBR都是先读取的,然后再读取所有的OBR,你会如何处理代码?你想让所有其他记录得到最后一个文件名吗?如果不是,如果你在单个文件上使用单个核心运行你的应用程序,那么你可能会按照你期望的顺序读入文件,但是为什么使用spark呢? –

+0

@ASpotySpot我想依次读取它,并检查它是否获取** OBR **作为第一个值创建“OBR_filename_id”,并在所有obx中放入相同的“OBR_filename_id”,直到它到达下一个** OBR ** – animal

+0

因此,您的文件由于它在hdfs上被分成许多部分。例如,顺序阅读意味着什么?如果它没有被分割成许多部分,那么不管怎么做,它都会平行地处理文件的部分内容,除非你使用单个内核,否则让它依次操作依然是棘手的(据我所知)。我可以把一些东西放在一起,但在这一点上,我相信它使用火花毫无意义。我的HDFS中的 –

回答

1

好了,所以在我的评论中提到,这将只在单一内核上,而不应使用火花这样做,除非有人能上的东西我一些启发失踪。 我假设该文件只是您的示例中所述的hdfs上的文本文件。

val text: RDD[(String, Long)] = sc.textFile(<path>).zipWithIndex 
val tupled: RDD[((String, Int, String), Int)] = text.map{case (r, i) => (r.split('|'), i)).map{case (s, i) => ((s(0), s(1).toInt, s(2)), i)} 
val obrToFirstIndex: Array[(Int, Long)] = tupled.filter(_._1._1 == "OBR").map{case (t, i) => (t._2, i)}.reduceByKey(Math.min).collect() 
val bcIndexes = sc.broadcast(obrToFirstIndex.sortBy(_._2)) 
val withObr = tupled.mapValues(i => bcIndexes.value.find(_._2 >= i).getOrElse(bcIndexes.value.last)._1) 
val result: RDD[String] = withObr.map{case ((t1, t2, t2), obrind) => Array(t1, t2, t3, s"OBR_filaneme_$obrind").mkString("|") 

在我的当前ennvironement我无法测试上面的,因此可能会受到差一错误或错别字轻微但这个想法是存在的。但让我重申,这不是一个火花的工作。

编辑:刚刚发生在我身上,因为只有一部分可以使用mapPartitions,只是写代码将如何在该分区内的Java/Scala。

您遇到的问题是查找不正确,它需要不同的条件才能工作。这里是我之前用mapPartitions暗示的更简单的方法

val text: RDD[String] = sc.textFile(<path>) 
val result: RDD[String] = text.mapPartitions{part => 
    var obrInd = 0 
    part.map{r => 
     val code= r.split('|')(0) 
     if(code == "OBR") obrInd += 1 
     r + "|OBR_filename_" + obrInd 
    } 
} 
+0

你能告诉我为什么'reduceByKey(Math.min)'被使用? – animal

+0

在不使用整个分区的火花操作中,没有任何行具有任何其他行的概念。例如)地图不能根据其他行的内容来改变其输出。我们需要以某种方式组合行来解决您的问题。我所做的就是根据OBR ID将所有行组合在一起。然后我采取最小的索引(这里索引是行号)以获得第一次发生。例如)在你的样本中你会得到:1 - > 0,2 - > 4,3 - > 7.然后我们用它来决定哪个索引应该到哪个OBR ID。现在发生对我来说,可能并不需要,如果每个OBR行都有一个唯一的ID –

+0

我想你的方式,但我得到这个结果 'OBR | 1 |代谢小组| OBR_filaneme_1 OBX | 1 |葡萄糖| OBR_filaneme_2 OBX | 2 | BUN | OBR_filaneme_2 OBX | 3 |肌酐| OBR_filaneme_2 OBR | 2 | RFLX核查| OBR_filaneme_2 OBX | 1 | EGFR | OBR_filaneme_3 OBX | 2 | SODIUM | OBR_filaneme_3 OBR | 3 |歧义DEFAULT | OBR_filaneme_3 OBX | 1 | POTASSIUM | OBR_filaneme_3' – animal