2017-06-13 53 views
2

是否有方法使用sparklyr/dplyr的函数复制Spark数据帧的行?R - 如何使用sparklyr复制火花数据帧中的行

sc <- spark_connect(master = "spark://####:7077") 

df_tbl <- copy_to(sc, data.frame(row1 = 1:3, row2 = LETTERS[1:3]), "df") 

这是所需的输出,保存到一个新的火花TBL:

> df2_tbl 
    row1 row2 
    <int> <chr> 
1  1  A 
2  1  A 
3  1  A 
4  2  B 
5  2  B 
6  2  B 
7  3  C 
8  3  C 
9  3  C 

回答

2

随着sparklyr可以使用arrayexplode通过@Oli的建议:

df_tbl %>% 
    mutate(arr = explode(array(1, 1, 1))) %>% 
    select(-arr) 

# # Source: lazy query [?? x 2] 
# # Database: spark_connection 
# row1 row2 
# <int> <chr> 
# 1  1 A  
# 2  1 A  
# 3  1 A  
# 4  2 B  
# 5  2 B  
# 6  2 B  
# 7  3 C  
# 8  3 C  
# 9  3 C  

和广义

library(rlang) 

df_tbl %>% 
    mutate(arr = !!rlang::parse_quo(
    paste("explode(array(", paste(rep(1, 3), collapse = ","), "))") 
)) %>% select(-arr) 

# # Source: lazy query [?? x 2] 
# # Database: spark_connection 
# row1 row2 
# <int> <chr> 
# 1  1 A  
# 2  1 A  
# 3  1 A  
# 4  2 B  
# 5  2 B  
# 6  2 B  
# 7  3 C  
# 8  3 C  
# 9  3 C 

在这里你可以轻松地调整行数。

1

我想到的第一是使用explode功能(它到底是什么,是指在理念火花)。然而,在SparkR中似乎并没有支持数组(据我所知)。

> structField("a", "array") 
Error in checkType(type) : Unsupported type for SparkDataframe: array 

不过,我可以提出其他两种方法:

  1. 一个简单但并不是很优雅的一个:

    head(rbind(df, df, df), n=30) 
    # row1 row2 
    # 1 1 A 
    # 2 2 B 
    # 3 3 C 
    # 4 1 A 
    # 5 2 B 
    # 6 3 C 
    # 7 1 A 
    # 8 2 B 
    # 9 3 C 
    

    或用一个for循环更多的通用性:

    df2 = df 
    for(i in 1:2) df2=rbind(df, df2) 
    

    请注意,这也可以与union一起使用。

  2. 第二,更优雅方法(因为它仅意味着一个火花操作)是基于与尺寸3的数据帧交叉联接(笛卡尔积)(或任何其它数目):

    j <- as.DataFrame(data.frame(s=1:3)) 
    head(drop(crossJoin(df, j), "s"), n=100) 
    # row1 row2 
    # 1 1 A 
    # 2 1 A 
    # 3 1 A 
    # 4 2 B 
    # 5 2 B 
    # 6 2 B 
    # 7 3 C 
    # 8 3 C 
    # 9 3 C 
    
+0

它应该是'array '而不是'array',例如'structField(“a”,“array ”)'。 – user8371915

0

我不知道R的rep函数的集群端版本。然而,我们可以使用连接模拟群集。

df_tbl <- copy_to(sc, data.frame(row1 = 1:3, row2 = LETTERS[1:3]), "df") 

replyr <- function(data, n, sc){ 
    joiner_frame <- copy_to(sc, data.frame(joiner_index = rep(1,n)), "tmp_joining_frame", overwrite = TRUE) 

    data %>% 
    mutate(joiner_index = 1) %>% 
    left_join(joiner_frame) %>% 
    select(-joiner_index) 

} 

df_tbl2 <- replyr(df_tbl, 3, sc) 
# row1 row2 
# <int> <chr> 
# 1  1 A  
# 2  1 A  
# 3  1 A  
# 4  2 B  
# 5  2 B  
# 6  2 B  
# 7  3 C  
# 8  3 C  
# 9  3 C 

它能够完成任务,但它是一个有点脏,因为tmp_joining_frame将持续存在。我不确定这样做多好,因为对函数的多次调用进行了懒惰评估。