2015-10-01 85 views
5

我是Spark,SparkR和一般所有HDFS相关技术的新手。我已经安装了最近的Spark 1.5.0并运行SparkR的一些简单代码:createDataFrame中的SparkR瓶颈?

Sys.setenv(SPARK_HOME="/private/tmp/spark-1.5.0-bin-hadoop2.6") 
.libPaths("/private/tmp/spark-1.5.0-bin-hadoop2.6/R/lib") 
require('SparkR') 
require('data.table') 

sc <- sparkR.init(master="local") 
sqlContext <- sparkRSQL.init(sc) 
hiveContext <- sparkRHive.init(sc) 

n = 1000 
x = data.table(id = 1:n, val = rnorm(n)) 

Sys.time() 
xs <- createDataFrame(sqlContext, x) 
Sys.time() 

代码立即执行。但是,当我将其更改为n = 1000000时,大约需要4分钟(两个Sys.time()调用之间的时间)。当我在控制台端口4040上检查这些作业时,作业n = 1000的持续时间为0.2s,作业为n = 1000000 0.3s。难道我做错了什么?

+0

我花了一段时间,因为在途中出现了一些意想不到的问题(在途中碰到一些其他错误,更别提我忘记了可以把它们作为数据帧列的奇怪的东西),但这应该在1.6.0 :[SPARK-11086](https://issues.apache.org/jira/browse/SPARK-11086) – zero323

回答

4

你没有做任何特别错误的事情。这只是一个不同因素组合的效果:

  1. createDataFrame因为它目前(Spark 1.5.1)的实施很慢。这是SPARK-8277中描述的已知问题。
  2. 当前的实施方式与data.table不匹配。
  3. 基数R相对较慢。聪明的人说这是一个功能而不是错误,但它仍然是需要考虑的事情。

直到SPARK-8277是解决有没有什么可以做,但有两个选择,你可以尝试:

  • 使用data.table普通的老data.frame代替。使用航班数据集(227496行,14列):

    df <- read.csv("flights.csv") 
    microbenchmark::microbenchmark(createDataFrame(sqlContext, df), times=3) 
    
    ## Unit: seconds 
    ##        expr  min  lq  mean median 
    ## createDataFrame(sqlContext, df) 96.41565 97.19515 99.08441 97.97465 
    ##  uq  max neval 
    ## 100.4188 102.8629  3 
    

    相比data.table

    dt <- data.table::fread("flights.csv") 
    microbenchmark::microbenchmark(createDataFrame(sqlContext, dt), times=3) 
    
    ## Unit: seconds   
    ##        expr  min  lq  mean median 
    ## createDataFrame(sqlContext, dt) 378.8534 379.4482 381.2061 380.043 
    ##  uq  max neval 
    ## 382.3825 384.722  3 
    
  • 写入到磁盘,并使用spark-csv直接将数据加载到数据帧星火不与R.直接互动很疯狂因为它的声音:

    dt <- data.table::fread("flights.csv") 
    
    write_and_read <- function() { 
        write.csv(dt, tempfile(), row.names=FALSE) 
        read.df(sqlContext, "flights.csv", 
         source = "com.databricks.spark.csv", 
         header = "true", 
         inferSchema = "true" 
        ) 
    } 
    
    ## Unit: seconds 
    ##    expr  min  lq  mean median 
    ## write_and_read() 2.924142 2.959085 2.983008 2.994027 
    ##  uq  max neval 
    ## 3.01244 3.030854  3 
    

我不会再同盟确定是否真的有意义推出可以在R处理的数据到Spark首先,但不要停留在此。

编辑

这个问题应该由SPARK-11086在星火1.6.0解决。

+0

我喜欢你最后的评论! :) – eliasah

+0

是的......我可能会偏向这里,但我觉得SparkR从R中获得了大部分乐趣,并且很少给予回报。即使在相对较小的数据上,Scala API也足够强大,值得考虑,尤其是在Breeze之外没有惯用的选择时。在Python中是50-50。但SparkR感觉就像一个笨拙的数据库驱动:) – zero323

+0

你能详细说明你的第二点为什么? data.table是一个data.frame,有许多方法可以访问类似data.frame的列。因此有点困惑。另外,在你的第三点上,什么比较慢?并在这方面的操作? – Arun