2016-10-16 162 views
4

我有上面的代码作为spark驱动程序,当我执行我的程序时,它可以正常工作将所需数据保存为parquet文件。Spark java地图函数执行两次

 String indexFile = "index.txt"; 
     JavaRDD<String> indexData = sc.textFile(indexFile).cache(); 
     JavaRDD<String> jsonStringRDD = indexData.map(new Function<String, String>() { 
     @Override 
     public String call(String patientId) throws Exception { 
     return "json array as string" 
     } 
     }); 

//1. Read json string array into a Dataframe (execution 1) 
     DataFrame dataSchemaDF = sqlContext.read().json(jsonStringRDD); 
//2. Save dataframe as parquet file (execution 2) 
     dataSchemaDF.write().parquet("md.parquet"); 

但我观察到我在RDD indexData上的映射函数正在执行两次。 第一,当我读到jsonStringRdd如使用SQLContextDataFrame,当我写的dataSchemaDF到拼花文件

你可以指导我在这,如何避免这种重复执行?有没有其他更好的方法将json字符串转换为Dataframe?

+0

你在哪里看到两张地图? RDD's被懒惰地评估。 'map'操作是一个转换,而不是一个操作,所以'jsonStringRDD'的分配不应该立即运行。也许用于读取数据框和写入镶木地板的执行路径都需要收集RDD。 –

+0

我在mapper函数中有日志语句,我在日志中看到它们两次。 – blob

回答

6

我相信原因是JSON阅读器缺少模式。执行时:

sqlContext.read().json(jsonStringRDD); 

Spark必须推断新创建的DataFrame的模式。要做到这一点它具有扫描输入RDD,如果你想避免它急切地进行

这一步,你必须创建一个StructType它描述了JSON文件的形状:

​​

,并使用它时,你创建DataFrame

DataFrame dataSchemaDF = sqlContext.read().schema(schema).json(jsonStringRDD);