2016-11-18 57 views
3

我有一个来自IoT应用程序的带有JSON数据的Kafka代理。我从Spark Streaming应用程序连接到此服务器以执行一些处理。如何访问Spark Streaming应用程序中的缓存数据?

我想保存在内存(RAM)我的JSON数据的一些特定领域,我相信我可以使用cache()persist()运营商实现。

下一次当我在Spark Streaming应用程序中接收到新的JSON数据时,我检查内存(RAM)是否有可以检索的公用字段。如果是的话,我做一些简单的计算,最后我更新存储在内存中的字段的值(RAM)。

因此,我想知道我之前所描述的是否有可能。如果是,我必须使用cache()还是persist()?我怎样才能从记忆中检索我的领域?

回答

2

有可能与使用内存或磁盘中的Spark中应用的数据cache/persist(不一定只有星火流媒体应用程序 - 它是caching in Spark更普遍使用)。

但是......在星火流你有这样的使用情况被称为状态计算特殊支持。请参阅Spark Streaming Programming Guide探索可能性。

我认为你的使用案例mapWithState运营商正是你所追求的。

0

Spark不能这样工作。请以分散的方式思考。

对于保持RAM的第一部分。您可以使用cache()persist()任何人,默认情况下他们将数据保存在工作人员的内存中。

您可以从Apache Spark代码进行验证。

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ 
    def persist(): this.type = persist(StorageLevel.MEMORY_ONLY) 

    /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ 
    def cache(): this.type = persist() 

据我了解你的用例,你需要UpdateStateByKey操作来实现你的第二个用例!

有关窗口的更多信息,请参阅here

相关问题