我已经想了很久,很努力。特别是我想知道是否可以在Spark中实际实现一个数据库数据库。
那么答案是肯定的。首先,您需要一个首先将主要数据集缓存到内存中的程序,然后每隔几个小时执行一次优化的“连接 - 小型”更新主要数据集。现在显然Spark会有一种方法来实现一个小小的连接(也许它已经在1.0.0版本中了 - 我的堆栈在0.9.0之前一直停留在CDH 5.1.0版本之前)。
无论如何,您可以手动实现一个小型连接,通过采用定期双小时数据集并将其转化为HashMap,然后将其作为广播变量进行广播。这意味着HashMap将被复制,但每个节点只能复制一次(比较这只是参考Map - 它将在每个任务中复制一次 - 成本更高)。然后,您将使用广播地图添加主数据集并添加新记录。然后您可以定期(每晚)保存到hdfs或其他东西。
因此,这里是一些破旧的伪码阐明:
var mainDataSet: RDD[KeyType, DataType] = sc.textFile("/path/to/main/dataset")
.map(parseJsonAndGetTheKey).cache()
everyTwoHoursDo {
val newData: Map[KeyType, DataType] = sc.textFile("/path/to/last/two/hours")
.map(parseJsonAndGetTheKey).toarray().toMap
broadcast(newData)
val mainDataSetNew =
mainDataSet.map((key, oldValue) => (key,
newData.get(key).map(newDataValue =>
update(oldValue, newDataValue))
.getOrElse(oldValue)))
.cache()
mainDataSetNew.someAction() // to force execution
mainDataSet.unpersist()
mainDataSet = mainDataSetNew
}
我还以为你能很聪明,并使用自定义partioner用自己的自定义索引,然后使用自定义的方式更新分区,以便每个分区本身都拥有一个子图。然后,您可以跳过更新分区,您知道这些分区不会包含newData中出现的任何键,还会优化更新过程。
我个人认为这是一个非常酷的想法,而好的是你的数据集已经准备好在内存中进行一些分析/机器学习。缺点是你有点重新发明轮子。当Datastax与Databricks(制作Spark的人)合作并最终支持某种类似的开箱即用的功能时,查看使用Cassandra可能会更好。
进一步阅读:
http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables
http://www.datastax.com/2014/06/datastax-unveils-dse-45-the-future-of-the-distributed-database-management-system
如何使用可累积哈希映射,获取当地的累积值可能是有用的,特别是如果类似的数据结束在相同的分区 – aaronman
谢谢 - 我会给它一个去。 – maj