2015-11-10 57 views
2

我有一个加入价目表交易数据帧:星火dataframes:提取基于其他列的值的列

+----------+----------+------+-------+-------+ 
| paid | currency | EUR | USD | GBP | 
+----------+----------+------+-------+-------+ 
| 49.5 | EUR | 99 | 79 | 69 | 
+----------+----------+------+-------+-------+ 

客户已支付EUR 49.5,如图所示的“货币”柱。我现在想要将该付费价格与价目表中的价格进行比较。

为此我需要访问基于“货币”的值正确的列像这样:

df.withColumn("saved", df.col(df.col($"currency")) - df.col("paid")) 

,我希望这将成为

df.withColumn("saved", df.col("EUR") - df.col("paid")) 

这种失败,但是。我尝试了所有我可以想象的东西,包括UDF,都没有进展。

我想这有一些优雅的解决方案?有人可以帮忙吗?

回答

3

假设列名匹配在currency列值:

import org.apache.spark.sql.functions.{lit, col, coalesce} 
import org.apache.spark.sql.Column 

// Dummy data 
val df = sc.parallelize(Seq(
    (49.5, "EUR", 99, 79, 69), (100.0, "GBP", 80, 120, 50) 
)).toDF("paid", "currency", "EUR", "USD", "GBP") 

// A list of available currencies 
val currencies: List[String] = List("EUR", "USD", "GBP") 

// Select listed value 
val listedPrice: Column = coalesce(
    currencies.map(c => when($"currency" === c, col(c)).otherwise(lit(null))): _*) 

df.select($"*", (listedPrice - $"paid").alias("difference")).show 

// +-----+--------+---+---+---+----------+ 
// | paid|currency|EUR|USD|GBP|difference| 
// +-----+--------+---+---+---+----------+ 
// | 49.5|  EUR| 99| 79| 69|  49.5| 
// |100.0|  GBP| 80|120| 50|  -50.0| 
// +-----+--------+---+---+---+----------+ 

与SQL等效的listedPrice表达是这样的:

COALESCE(
    CASE WHEN (currency = 'EUR') THEN EUR ELSE null, 
    CASE WHEN (currency = 'USD') THEN USD ELSE null, 
    CASE WHEN (currency = 'GBP') THEN GBP ELSE null 
) 

使用foldLeft备选:

import org.apache.spark.sql.functions.when 

val listedPriceViaFold = currencies.foldLeft(
    lit(null))((acc, c) => when($"currency" === c, col(c)).otherwise(acc)) 

df.select($"*", (listedPriceViaFold - $"paid").alias("difference")).show 

// +-----+--------+---+---+---+----------+ 
// | paid|currency|EUR|USD|GBP|difference| 
// +-----+--------+---+---+---+----------+ 
// | 49.5|  EUR| 99| 79| 69|  49.5| 
// |100.0|  GBP| 80|120| 50|  -50.0| 
// +-----+--------+---+---+---+----------+ 

其中listedPriceViaFold转化为SQL语句:

CASE 
    WHEN (currency = 'GBP') THEN GBP 
    ELSE CASE 
    WHEN (currency = 'USD') THEN USD 
    ELSE CASE 
     WHEN (currency = 'EUR') THEN EUR 
     ELSE null 

可惜我不知道任何内置的功能,这可以表达直接这样的SQL

CASE currency 
    WHEN 'EUR' THEN EUR 
    WHEN 'USD' THEN USD 
    WHEN 'GBP' THEN GBP 
    ELSE null 
END 

,但您可以用原始SQL使用此结构。

我的假设是不正确的,您可以简单地在列名称和currency列中的值之间添加映射。

编辑

另一种选择,这可能是,如果源支持谓词下推和高效柱修剪效率高,是由货币和工会子集:

currencies.map(
    // for each currency filter and add difference 
    c => df.where($"currency" === c).withColumn("difference", $"paid" - col(c)) 
).reduce((df1, df2) => df1.unionAll(df2)) // Union 

它相当于SQL LIKE这个:

SELECT *, EUR - paid AS difference FROM df WHERE currency = 'EUR' 
UNION ALL 
SELECT *, USD - paid AS difference FROM df WHERE currency = 'USD' 
UNION ALL 
SELECT *, GBP - paid AS difference FROM df WHERE currency = 'GBP' 
+1

我喜欢并且完美的coalesce()方法,很多Spark的工作,但它会很好!感谢那! – TomTom101

+0

不客气。我已经添加了一个解决方案。 – zero323

+0

第二个是相当好,聪明的方式来使用工会。 – mehmetminanc

0

我不能想办法与DataFrame在做这个,我怀疑有简单的方法,但是如果你把该表为RDD

// On top of my head, warn if wrong. 
// Would be more elegant with match .. case 
def d(l: (Int, String, Int, Int, Int)): Int = { 
    if(l._2 == "EUR") 
    l._3 - l._1 
    else if (l._2 == "USD") 
    l._4 - l._1 
    else 
    l._5 -l._1 
} 
val rdd = df.rdd 
val diff = rdd.map(r => (r, r(d))) 

将最有可能加薪类型错误,我希望你可以浏览这些内容。

+0

谢谢!还有几种货币,所以我想避免if/else或在()。otherwise()构造时嵌套。 – TomTom101

+0

说到这一点,我的问题似乎是我无法得到该列$“货币”的字面值,我想知道如何($“列”,[然后])工作。我想知道是否代码的一部分[有](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Column.scala )会帮助我? 'lit(condition).expr' – TomTom101