2017-09-15 75 views
-1

我对Spark比较新,我试图从Spark数据集中过滤掉无效记录。 我的数据集看起来是这样的:在scala中使用自定义函数过滤Spark数据集

| Id | Curr| Col3 | 

| 1 | USD | 1111 | 
| 2 | CNY | 2222 | 
| 3 | USD | 3333 | 
| 1 | CNY | 4444 | 

在我的逻辑,每个ID有vaild货币。因此,这将主要是地图的id->currency

val map = Map(1 -> "USD", 2 -> "CNY")

一个我想筛选出从数据集有标识不相应的有效的货币代码行。所以,我的过滤操作之后,该数据集应该是这个样子:

| Id | Curr| Col3 | 

| 1 | USD | 1111 | 
| 2 | CNY | 2222 | 

我这里的限制是我不能使用UDF。 有人可以帮我想出一个过滤器操作吗?

回答

3

可以出map的创建一个数据帧,然后做与原始数据帧的内连接对其进行过滤:

val map_df = map.toSeq.toDF("Id", "Curr") 
// map_df: org.apache.spark.sql.DataFrame = [Id: int, Curr: string] 

df.join(map_df, Seq("Id", "Curr")).show 
+---+----+----+ 
| Id|Curr|Col3| 
+---+----+----+ 
| 1| USD|1111| 
| 2| CNY|2222| 
+---+----+----+ 
+0

也许我的问题是不够清楚。该数据集可能有一个有效的ID行,但有一个无效的货币代码。像(1,CNY,333)。在这种情况下,我想删除这些条目。我会更新我的问题以反映这种情况。 –

+0

你想'id'和货币都同时匹配吗? – Psidom

+0

是的,我基本上只想保留具有有效的“Id”和“货币”信息的行。任何不匹配'Id'和'Currency'列的行都应该被删除。 –

0
val a = List((1,"USD",1111),(2,"CAN",2222),(3,"USD",4444),(1,"CAN",5555)) 
val b = Map(1 -> "USD",2 -> "CAN") 
a.filter(x => b.keys.exists(_ == x._1)).filter(y => y._2 == b(y._1))