2017-08-08 26 views
2

我想将火花数据帧中的NA/NULL分配给最近的邻居。我来自R背景,所以我使用sparklyr,但无法找到一种方法来做到这一点。Sparklyr填充火花数据帧中的NA/NULL

下面是一个例子代码:

set.seed(1)  
example <- data.frame (ID = 1:10, Cat = letters[1:5], 
         Numb = sample(c(NA, NA, NA, NA, 1:10), 10)) 
    ID Cat Numb 
    1 1 a NA 
    2 2 b 1 
    3 3 c 3 
    4 4 d 6 
    5 5 e NA 
    6 6 a 5 
    7 7 b 4 
    8 8 c 9 
    9 9 d 10 
    10 10 e NA 

所以想填写麻木柱,ID 1 NA至ID2麻木1,ID 5至任一ID 4和6(6或5),和ID 10到ID 9值(10)。它可以很容易地在R中完成。无论如何要通过Sparklyr在Spark中做到这一点?

这里是我的R染料溶液:

example$Numb1 <- example$Numb[c(1,1:(nrow(example)-1))] 
example$Numb2 <- example$Numb[c(2:(nrow(example)), nrow(example))] 
example$Merge <- ifelse(is.na(example$Numb), ifelse(is.na(example$Numb1), 
example$Numb2, example$Numb1), example$Numb) 

    ID Cat Numb Numb1 Numb2 Merge 
1 1 a NA NA  1  1 
2 2 b 1 NA  3  1 
3 3 c 3  1  6  3 
4 4 d 6  3 NA  6 
5 5 e NA  6  5  6 
6 6 a 5 NA  4  5 
7 7 b 4  5  9  4 
8 8 c 9  4 10  9 
9 9 d 10  9 NA 10 
10 10 e NA 10 NA 10 

当然,事情可能会变得更加复杂,如果我有连续的行多个NA值。也许可以建议另一个建议。

但对于sparklyr,我不知道我能做什么。

回答

1

这是一个部分工作的解决方案,它包含一个SQL查询和dplyr包中的mutate函数。它没有解决连续行中多个NA值的情况,因为它是您的基本R解决方案的翻译,但它可能对其他(更完整的)方法有用。

我已经使用HiveQL的Lag and Lead函数来执行列向下移动和向上移动。它涉及创建一个新的辅助Spark表(example2),其中包含“Numb1”和“Numb2”列。然后,一旦辅助表已经创建,您可以用mutate

library(DBI) 
library(sparklyr) 
library(dplyr) 

set.seed(1)  
exampleDF <- data.frame (ID = 1:10, Cat = letters[1:5], 
         Numb = sample(c(NA, NA, NA, NA, 1:10), 10)) 

# Connection to Spark and creation of the table to test. 
sc <- spark_connect("local") 
example <- copy_to(sc, exampleDF) 

# Create a Spark table with columns Numb1 and Numb2 
DBI::dbSendQuery(sc, "CREATE TABLE example2 AS (SELECT ID, Cat, Numb, LAG(Numb, 1) over (PARTITION BY 1 ORDER BY ID) AS Numb1, 
      LEAD(Numb, 1) over (PARTITION BY 1 ORDER BY ID) AS Numb2 FROM exampledf)") 

# Load the auxiliary table as a Spark DataFrame 
ex2 <- tbl(sc, "example2") 

# Mutate in order to create the Merged column 
res <- ex2 %>% 
    mutate(Merged = ifelse(is.na(Numb), ifelse(is.na(Numb1), Numb2, Numb1), Numb)) 

res 

# Source: lazy query [?? x 6] 
# Database: spark_connection 
     id cat numb numb1 numb2 Merged 
    <int> <chr> <int> <int> <int> <int> 
1  1  a NA NA  1  1 
2  2  b  1 NA  3  1 
3  3  c  3  1  6  3 
4  4  d  6  3 NA  6 
5  5  e NA  6  5  6 
6  6  a  5 NA  4  5 
7  7  b  4  5  9  4 
8  8  c  9  4 10  9 
9  9  d 10  9 NA  10 
10 10  e NA 10 NA  10 

作为一个侧面说明创建“合并”列中,您也可避免通过使用mutate功能(以及所有ifelse S) COALESCE功能的手段。我认为这样会更有效率。

DBI::dbGetQuery(sc, "SELECT ID, Cat, Numb, COALESCE(Numb, Numb1, Numb2) AS Merged FROM example2") 
    ID Cat Numb Merged 
1 1 a NA  1 
2 2 b 1  1 
3 3 c 3  3 
4 4 d 6  6 
5 5 e NA  6 
6 6 a 5  5 
7 7 b 4  4 
8 8 c 9  9 
9 9 d 10  10 
10 10 e NA  10 

我希望这会有所帮助。

EDITED

如果你想避免使用SQL可言,你可以用dplyr功能做到这一点也:

example %>% arrange(ID) %>% 
    mutate(Numb1 = lag(Numb, 1)) %>% 
    mutate(Numb2 = lead(Numb, 1L)) %>% 
    mutate(Merged = ifelse(is.na(Numb), ifelse(is.na(Numb1), Numb2, Numb1), Numb)) 
# Source:  lazy query [?? x 6] 
# Database: spark_connection 
# Ordered by: ID 
     ID Cat Numb Numb1 Numb2 Merged 
    <int> <chr> <int> <int> <int> <int> 
1  1  a NA NA  1  1 
2  2  b  1 NA  3  1 
3  3  c  3  1  6  3 
4  4  d  6  3 NA  6 
5  5  e NA  6  5  6 
6  6  a  5 NA  4  5 
7  7  b  4  5  9  4 
8  8  c  9  4 10  9 
9  9  d 10  9 NA  10 
10 10  e NA 10 NA  10 
# ... with more rows 

我遇到了一些麻烦的编码两个连续mutate功能(这就是为什么我用首先是混合的SQL-dplyr方法)。我最终在sparklyr上打开了issue

+0

我认为滞后和领导是最有帮助的!谢谢Jaime! –

+0

@KevinZheng不客气:-) –