2016-12-28 40 views
0

我们有一个小配置表(大约50000条记录),每天更新一次。刷新缓存的数据帧?

我们有一个缓存的数据框用于这个表格,并且正在加入spark数据。在基本配置单元中加载新数据时,我们如何刷新数据框?

DataFrame tempApp = hiveContext.table("emp_data"); 

//Get Max Load-Date 
Date max_date = max_date = tempApp.select(max("load_date")).collect()[0].getDate(0); 

//Get data for latest date and cache. This will be used to join with stream data. 
DataFrame emp= hiveContext.table("emp_data").where("load_date='" + max_date + "'").cache(); 

// Get message from Kafka Stream 
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(....); 

JavaDStream<MobileFlowRecord> rddMobileFlorRecs = messages.map(Record::parseFromMessage); 

kafkaRecs.foreachRDD(rdd->{DataFrame recordDataFrame = hiveContext.createDataFrame(rdd, Record.class); 

DataFrame joinedDataSet = recordDataFrame.join(emp, 
recordDataFrame.col("application").equalTo(app.col("emp_id")); 
joinedDataSet. <Do furthur processing> 
}); 

回答

0

星火自动unpersist的RDD或数据帧,如果他们不再使用。为了知道是否缓存了RDD或Dataframe,可以进入Spark UI - > Storage tabl并查看内存详细信息。您可以使用df.unpersist()sqlContext.uncacheTable("sparktable")uncacheTable APi从内存中删除df或表。此选项在新的SparksessionAPi中不可用,但向后兼容始终存在。除非且直到您说出任何操作,否则Spark不会将任何数据加载或处理到RDD或DataFrame中。

因此对于您在执行join后,为您的Dataframe执行unpersist()。这将提高性能并解决您的问题。

Databricks

+0

我怀疑我是否理解你的解决方案。缓存和取消缓存数据集可能会解决问题,但会破坏缓存的目的,因为缓存仅适用于一次迭代。我已经添加了示例代码以获得更多的说明。其次,我测试了每次迭代的缓存和非缓存大约会延迟3秒。想知道是否有其他方法来实现这一目标? – Akhil

0

您可以手动完成。事情是这样的:

DataFrame refresh(DataFrame orig) { 
    if (orig != null) { 
     orig.unpersist(); 
    } 
    DataFrame res = get the dataframe as you normally would 
    res.cache() 
    return res 

现在,一旦调用这个每天或当你想刷新这样的:

DataFrame join_df = refresh(join_df) 

什么这主要的作用是unpersists以前的版本(删除缓存),读新的,然后缓存它。所以在实践中,数据帧被刷新。

您应该注意,数据帧只会在刷新后第一次在缓存为惰性时使用时才会保留在内存中。