2015-09-07 58 views
1

我有这样的邮件的recods:用户定义的聚合函数在SparkR

Name MailingID Timestamp Event 
1 John   1 2014-04-18  Sent 
2 John   2 2015-04-21  Sent 
3 Mary   1 2015-04-22 Returned 
4 Mary   2 2015-04-25  Sent 
5 John   1 2015-05-01 Replied 

可以作为DataFrame创建:

df <- createDataFrame(sqlContext, data.frame(Name = c('John','John','Mary','Mary','John'), 
              MailingID = c(1,2,1,2,1), 
              Timestamp=c('2014-04-18','2015-04-21','2015-04-22','2015-04-25','2015-05-01'), 
              Event=c('Sent','Sent','Returned','Sent','Replied'))) 

我想找出谁已回复任何的2最新的邮件发送给他/她,所以用一个总结辅助函数和dplyr我可以这样做:

localDf <- collect(df) 

library(lubridate) 
library(magrittr) 
library(dplyr) 

hasRepliedLatest <- function(MailingID, Timestamp, Event, Latest_N) { 
    length(intersect(MailingID[Event == 'Replied'], MailingID[Event == 'Sent'][1:Latest_N])) > 0 
} 

localDf %>% 
    arrange(desc(Timestamp)) %>% 
    group_by(Name) %>% 
    summarize(RepliedLatest = hasRepliedLatest(MailingID, Timestamp, Event, 2)) 

detach(package:dplyr) # to avoid function confliction with SparkR 

结果是:

Name RepliedLatest 
1 John   TRUE 
2 Mary   FALSE 

现在我想用SparkR要做到这一点,即在DataFrame而不是在本地data.frame。所以,我想:

df %>% 
    arrange(desc(df$Timestamp)) %>% 
    group_by(df$Name) %>% 
    summarize(RepliedLatest = hasRepliedLatest(df$MailingID, df$Timestamp, df$Event, 2)) 

然后我得到了错误,说我的功能不会与S4 DataFrame类工作。如何在SparkR中正确执行此操作?使用由sparkRHive.initsparkRSQL.init创建的sqlContext的SQL查询解决方案也是受欢迎的。

+0

你能解释一下编码? “约翰2014-04-18发送”是否意味着约翰在2014-04-18收到一封电子邮件? – zero323

回答

2

SparkSQL < = 1.4不支持用户定义的集合函数,据我所知SparkR根本不用UDF,所以除非你使用当前的开发分支或1.5 RCF不是一个选项。

我仍然不知道如果我理解你的数据模型和逻辑,但你可以尝试这样的事:

# Select last 2 sent events and all other which occurred in this window 
tmp <- sql(sqlContext,  
    "SELECT *, SUM(CASE WHEN event = 'Sent' THEN 1 ELSE 0 END) OVER w AS ind 
    FROM df WHERE Event IN ('Sent', 'Replied') 
    HAVING ind <= 2 
    WINDOW w AS (PARTITION BY name ORDER BY DATE(Timestamp) DESC)") 


# Split sent and replied 
sent <- tmp %>% filter(tmp$Event == "Sent") 
replied <- tmp %>% filter(tmp$Event == "Replied") 

registerTempTable(sent, "sent") 
registerTempTable(replied, "replied") 

# Join and count 
sql(sqlContext, 
    "SELECT 
     sent.name, 
     SUM(
      CASE WHEN replied.event IS NOT NULL THEN 1 
      ELSE 0 END 
     ) > 0 AS repliedlatest 
    FROM sent LEFT JOIN replied ON 
     sent.name = replied.name AND 
     sent.mailingid = replied.mailingid 
    -- Not part of the original logic 
    WHERE DATE(sent.timestamp) <= DATE(replied.timestamp) 
    GROUP BY sent.name") %>% head() 
+0

你太棒了!它在整个文件中的运行速度都很慢。如果可以进一步优化此代码,那么最好的方法是什么?缓存'tmp'? – Bamqf

+0

这是开始的好地方,但请记住使用'cacheTable'。 – zero323

+0

它应该也可以避免连接并使用窗口函数直接在tmp之上构建,但我不确定是否理解数据足以解决这个问题。是否有可能获得多个回复?如果你有'发送 - >发送 - >回复',会发生什么? – zero323

相关问题