我有这样的邮件的recods:用户定义的聚合函数在SparkR
Name MailingID Timestamp Event
1 John 1 2014-04-18 Sent
2 John 2 2015-04-21 Sent
3 Mary 1 2015-04-22 Returned
4 Mary 2 2015-04-25 Sent
5 John 1 2015-05-01 Replied
可以作为DataFrame
创建:
df <- createDataFrame(sqlContext, data.frame(Name = c('John','John','Mary','Mary','John'),
MailingID = c(1,2,1,2,1),
Timestamp=c('2014-04-18','2015-04-21','2015-04-22','2015-04-25','2015-05-01'),
Event=c('Sent','Sent','Returned','Sent','Replied')))
我想找出谁已回复任何的2最新的邮件发送给他/她,所以用一个总结辅助函数和dplyr
我可以这样做:
localDf <- collect(df)
library(lubridate)
library(magrittr)
library(dplyr)
hasRepliedLatest <- function(MailingID, Timestamp, Event, Latest_N) {
length(intersect(MailingID[Event == 'Replied'], MailingID[Event == 'Sent'][1:Latest_N])) > 0
}
localDf %>%
arrange(desc(Timestamp)) %>%
group_by(Name) %>%
summarize(RepliedLatest = hasRepliedLatest(MailingID, Timestamp, Event, 2))
detach(package:dplyr) # to avoid function confliction with SparkR
结果是:
Name RepliedLatest
1 John TRUE
2 Mary FALSE
现在我想用SparkR
要做到这一点,即在DataFrame
而不是在本地data.frame
。所以,我想:
df %>%
arrange(desc(df$Timestamp)) %>%
group_by(df$Name) %>%
summarize(RepliedLatest = hasRepliedLatest(df$MailingID, df$Timestamp, df$Event, 2))
然后我得到了错误,说我的功能不会与S4 DataFrame
类工作。如何在SparkR
中正确执行此操作?使用由sparkRHive.init
或sparkRSQL.init
创建的sqlContext
的SQL查询解决方案也是受欢迎的。
你能解释一下编码? “约翰2014-04-18发送”是否意味着约翰在2014-04-18收到一封电子邮件? – zero323