2016-01-12 33 views
1

我想知道sparkR是否更容易合并大数据集而不是“常规R”?我有12个csv文件,大约500,000行40列。这些文件是2014年的月度数据。我想为2014年制作一个文件。这些文件都具有相同的列标签,我希望按第一列(年)合并。但是,有些文件比其他文件有更多的行。使用sparkR合并大数据集

当我运行下面的代码:

setwd("C:\\Users\\Anonymous\\Desktop\\Data 2014") 

file_list <- list.files() 

for (file in file_list){ 

# if the merged dataset doesn't exist, create it 
if (!exists("dataset")){ 
dataset <- read.table(file, header=TRUE, sep="\t") 
} 

# if the merged dataset does exist, append to it 
if (exists("dataset")){ 
temp_dataset <-read.table(file, header=TRUE, sep="\t") 
dataset<-rbind(dataset, temp_dataset) 
rm(temp_dataset) 
} 

} 

[R坠毁。

当我运行这段代码:

library(SparkR) 
library(magrittr) 
# setwd("C:\\Users\\Anonymous\\Desktop\\Data 2014\\Jan2014.csv") 
sc <- sparkR.init(master = "local") 
sqlContext <- sparkRSQL.init(sc) 

Jan2014_file_path <- file.path('Jan2014.csv') 

system.time(
housing_a_df <- read.df(sqlContext, 
         "C:\\Users\\Anonymous\\Desktop\\Data  2014\\Jan2014.csv", 
         header='true', 
         inferSchema='false') 
) 

我得到了以下错误:

Error in invokeJava(isStatic = TRUE, className, methodName, ...) : 
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0  in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): 

那么这将是在sparkR合并这些文件一个简单的方法?

+0

你读过[this](http://stackoverflow.com/questions/23169645/r-3-0-3-rbind-multiple-csv-files)吗?在第一节中,'file_list'csv文件中的所有文件是? –

+0

你说你想“按第一列合并”,但在你的示例代码中,你连接了来自不同文件的行。下面的答案(在撰写本文时)是关于合并=连接,而不是连接。 – kasterma

+0

请问下面有答案,回答你的问题?如果是,请接受答案。这可能有助于其他开发人员 – sag

回答

0

你应该以这种格式读取CSV文件: 编号:https://gist.github.com/shivaram/d0cd4aa5c4381edd6f85

# Launch SparkR using 
# ./bin/sparkR --packages com.databricks:spark-csv_2.10:1.0.3 

# The SparkSQL context should already be created for you as sqlContext 
sqlContext 
# Java ref type org.apache.spark.sql.SQLContext id 1 

# Load the local CSV file using `read.df`. Note that we use the CSV reader Spark package here. 
Jan2014 <- read.df(sqlContext, "C:/Users/Anonymous/Desktop/Data 2014/Jan2014.csv", "com.databricks.spark.csv", header="true") 

Feb2014 <- read.df(sqlContext, "C:/Users/Anonymous/Desktop/Data 2014/Feb2014.csv", "com.databricks.spark.csv", header="true") 

#For merging/joining by year 

#join 
    jan_feb_2014 <- join(Jan2014 , Feb2014 , joinExpr = Jan2014$year == Feb2014$year1, joinType = "left_outer") 
# I used "left_outer", so i want all columns of Jan2014 and matching of columns Feb2014, based upon your requirement change the join type. 
#rename the Feb2014 column name year to year1, as it gets duplicated while joining. Then you can remove the column "jan_feb_2014$year1" after joining by the code, "jan_feb_2014$year1 <- NULL" 

该如何通过一个文件加入一个。

+0

是否将列添加到其他daraframe的数据框?由于他想合并两个csv文件,我认为加入可能不适合他 – sag

+0

他想合并第一列“年”,所以我使用了加入。可能是他希望所有的月份都在列。@ SamuelAlexander –

0

一旦将文件作为数据帧读取,您可以使用SparkR的unionAll将数据帧合并到单个数据帧中。然后你可以把它写入一个单独的csv文件。

示例代码

df1 <- read.df(sqlContext, "/home/user/tmp/test1.csv", source = "com.databricks.spark.csv") 
    df2 <- read.df(sqlContext, "/home/user/tmp/test2.csv", source = "com.databricks.spark.csv") 
    mergedDF <- unionAll(df1, df2) 
    write.df(mergedDF, "merged.csv", "com.databricks.spark.csv", "overwrite") 

我测试过,并用它,但不反对你的尺寸的数据。 但我希望这会帮助你