2014-07-01 54 views
1

我正在考虑对某些实时工作使用Apache Spark流,但我不确定如何缓存数据集以用于加入/查找。Apache Spark流 - 用于加入的缓存数据集

主要输入将是来自卡夫卡的包含Id的json记录,我想使用查找数据集将该id转换为名称。查找数据集驻留在Mongo Db中,但我希望能够将其缓存在spark过程中,因为数据集很少发生更改(每隔几个小时更改一次),所以我不想为每个输入记录点击mongo或重新加载所有记录在每个火花批次中,但我需要能够定期更新火花中保存的数据(例如,每2小时)。

这样做的最好方法是什么?

谢谢。

回答

2

我已经想了很久,很努力。特别是我想知道是否可以在Spark中实际实现一个数据库数据库。

那么答案是肯定的。首先,您需要一个首先将主要数据集缓存到内存中的程序,然后每隔几个小时执行一次优化的“连接 - 小型”更新主要数据集。现在显然Spark会有一种方法来实现一个小小的连接(也许它已经在1.0.0版本中了 - 我的堆栈在0.9.0之前一直停留在CDH 5.1.0版本之前)。

无论如何,您可以手动实现一个小型连接,通过采用定期双小时数据集并将其转化为HashMap,然后将其作为广播变量进行广播。这意味着HashMap将被复制,但每个节点只能复制一次(比较这只是参考Map - 它将在每个任务中复制一次 - 成本更高)。然后,您将使用广播地图添加主数据集并添加新记录。然后您可以定期(每晚)保存到hdfs或其他东西。

因此,这里是一些破旧的伪码阐明:

var mainDataSet: RDD[KeyType, DataType] = sc.textFile("/path/to/main/dataset") 
    .map(parseJsonAndGetTheKey).cache() 

everyTwoHoursDo { 
    val newData: Map[KeyType, DataType] = sc.textFile("/path/to/last/two/hours") 
    .map(parseJsonAndGetTheKey).toarray().toMap 

    broadcast(newData) 

    val mainDataSetNew = 
    mainDataSet.map((key, oldValue) => (key, 
     newData.get(key).map(newDataValue => 
     update(oldValue, newDataValue)) 
     .getOrElse(oldValue))) 
    .cache() 

    mainDataSetNew.someAction() // to force execution 

    mainDataSet.unpersist() 
    mainDataSet = mainDataSetNew 
} 

我还以为你能很聪明,并使用自定义partioner用自己的自定义索引,然后使用自定义的方式更新分区,以便每个分区本身都拥有一个子图。然后,您可以跳过更新分区,您知道这些分区不会包含newData中出现的任何键,还会优化更新过程。

我个人认为这是一个非常酷的想法,而好的是你的数据集已经准备好在内存中进行一些分析/机器学习。缺点是你有点重新发明轮子。当Datastax与Databricks(制作Spark的人)合作并最终支持某种类似的开箱即用的功能时,查看使用Cassandra可能会更好。

进一步阅读:

http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables

http://www.datastax.com/2014/06/datastax-unveils-dse-45-the-future-of-the-distributed-database-management-system

+1

如何使用可累积哈希映射,获取当地的累积值可能是有用的,特别是如果类似的数据结束在相同的分区 – aaronman

+0

谢谢 - 我会给它一个去。 – maj

0

这里是一个相当简单的工作流程:

对于数据的每一批次:

  1. 转换批次的JSON数据到数据框(b_df)。
  2. 从MongoDB读取查找数据集作为DataFrame(m_df)。然后缓存,m_df。缓存()
  3. 使用b_df.join加入数据(m_df,“join_field”)
  4. 执行所需的聚合,然后写入数据源。