2017-08-17 60 views
1

我有一个SparkR数据帧组的最后一个值,如下所示:获得在星火

#Create R data.frame 
custId <- c(rep(1001, 5), rep(1002, 3), 1003) 
date <- c('2013-08-01','2014-01-01','2014-02-01','2014-03-01','2014-04-01','2014-02-01','2014-03-01','2014-04-01','2014-04-01') 
desc <- c('New','New','Good','New', 'Bad','New','Good','Good','New') 
newcust <- c(1,1,0,1,0,1,0,0,1) 
df <- data.frame(custId, date, desc, newcust) 

#Create SparkR DataFrame  
df <- createDataFrame(df) 
display(df) 
     custId| date | desc | newcust 
     -------------------------------------- 
     1001 | 2013-08-01| New | 1 
     1001 | 2014-01-01| New | 1 
     1001 | 2014-02-01| Good | 0 
     1001 | 2014-03-01| New | 1 
     1001 | 2014-04-01| Bad | 0 
     1002 | 2014-02-01| New | 1 
     1002 | 2014-03-01| Good | 0 
     1002 | 2014-04-01| Good | 0 
     1003 | 2014-04-01| New | 1 

newcust预示着新客户的每一个新的custId出现一次,或者是否同custIddesc恢复到'新”。我想获得的是newcust的每个分组的最后desc值,同时保持每个分组的第一个date。下面是我想要获得的DataFrame。我如何在Spark中做到这一点? PySpark或SparkR代码都可以工作。

#What I want 
custId| date | newcust | finaldesc 
---------------------------------------------- 
1001 | 2013-08-01| 1  | New 
1001 | 2014-01-01| 1  | Good 
1001 | 2014-03-01| 1  | Bad 
1002 | 2014-02-01| 1  | Good 
1003 | 2014-04-01| 1  | New 

回答

1

我不知道sparkR所以我会在pyspark回答。 您可以使用窗口功能来实现这一点。

首先,我们定义了“newcust分组”,你希望每一个行,其中newcust等于1是一个新组的开始,计算累计总和将这样的伎俩:

from pyspark.sql import Window 
import pyspark.sql.functions as psf 

w1 = Window.partitionBy("custId").orderBy("date") 
df1 = df.withColumn("subgroup", psf.sum("newcust").over(w1)) 

+------+----------+----+-------+--------+ 
|custId|  date|desc|newcust|subgroup| 
+------+----------+----+-------+--------+ 
| 1001|2013-08-01| New|  1|  1| 
| 1001|2014-01-01| New|  1|  2| 
| 1001|2014-02-01|Good|  0|  2| 
| 1001|2014-03-01| New|  1|  3| 
| 1001|2014-04-01| Bad|  0|  3| 
| 1002|2014-02-01| New|  1|  1| 
| 1002|2014-03-01|Good|  0|  1| 
| 1002|2014-04-01|Good|  0|  1| 
| 1003|2014-04-01| New|  1|  1| 
+------+----------+----+-------+--------+ 

对于每一个subgroup,我们希望保持第一日期:每的

w2 = Window.partitionBy("custId", "subgroup") 
df2 = df1.withColumn("first_date", psf.min("date").over(w2)) 

+------+----------+----+-------+--------+----------+ 
|custId|  date|desc|newcust|subgroup|first_date| 
+------+----------+----+-------+--------+----------+ 
| 1001|2013-08-01| New|  1|  1|2013-08-01| 
| 1001|2014-01-01| New|  1|  2|2014-01-01| 
| 1001|2014-02-01|Good|  0|  2|2014-01-01| 
| 1001|2014-03-01| New|  1|  3|2014-03-01| 
| 1001|2014-04-01| Bad|  0|  3|2014-03-01| 
| 1002|2014-02-01| New|  1|  1|2014-02-01| 
| 1002|2014-03-01|Good|  0|  1|2014-02-01| 
| 1002|2014-04-01|Good|  0|  1|2014-02-01| 
| 1003|2014-04-01| New|  1|  1|2014-04-01| 
+------+----------+----+-------+--------+----------+ 

最后,我们要保留的最后一行(按日期排序):

w3 = Window.partitionBy("custId", "subgroup").orderBy(psf.desc("date")) 
df3 = df2.withColumn(
    "rn", 
    psf.row_number().over(w3) 
).filter("rn = 1").select(
    "custId", 
    psf.col("first_date").alias("date"), 
    "desc" 
) 

+------+----------+----+ 
|custId|  date|desc| 
+------+----------+----+ 
| 1001|2013-08-01| New| 
| 1001|2014-01-01|Good| 
| 1001|2014-03-01| Bad| 
| 1002|2014-02-01|Good| 
| 1003|2014-04-01| New| 
+------+----------+----+ 
0

这里是@玛丽在SparkR代码:

w1 <- orderBy(windowPartitionBy('custId'), df$date) 
df1 <- withColumn(df, "subgroup", over(sum(df$newcust), w1)) 

w2 <- windowPartitionBy("custId", "subgroup") 
df2 <- withColumn(df1, "first_date", over(min(df1$date), w2)) 

w3 <- orderBy(windowPartitionBy("custId", "subgroup"), desc(df$date)) 
df3 <- withColumn(df2, "rn", over(row_number(), w3)) 
df3 <- select(filter(df3, df3$rn == 1), "custId", "first_date", "desc") 
df3 <- withColumnRenamed(df3, 'first_date', "date") 

df3 <- arrange(df3, 'custId', 'date') 
display(df3) 
+------+----------+----+ 
|custId|  date|desc| 
+------+----------+----+ 
| 1001|2013-08-01| New| 
| 1001|2014-01-01|Good| 
| 1001|2014-03-01| Bad| 
| 1002|2014-02-01|Good| 
| 1003|2014-04-01| New| 
+------+----------+----+