2016-11-09 121 views
0

我已经使用Kfka和mapwithsta函数编写了Spark应用程序。我已经创建了我的应用程序存储级别的快照enter image description here将MemorywithStatedstream的默认预留模式从Memory仅渲染到其他模式

正如您所看到的,Kafka流在内存和磁盘中都被凝结了..但是我找不到一种方法来更改mapwithste内部流的默认存储。这个代码的PICE我使用

val messages=KafkaUtils.createDirectStream[String, String, (String,String)](ssc, 
    kafkaParams, 
    fromOffsets, 
(r:org.apache.kafka.clients.consumer.ConsumerRecord[String,String]) =>(r.topic(),r.value())) 
    .persist(StorageLevel.MEMORY_AND_DISK_SER) 
.... 
val mapped1=message.map(x=>(x._2.hashCode().toString(),x)).mapWithState(stateSpec1) 

在我的应用程序张祖兴可以成为巨大的,所以我需要presiste内部张祖兴在emeory和disk..I将apprecite有这方面的帮助。

回答

0

mapWithState是一个分布式内存状态存储。它将您的状态保存在名为OpenHashMapBasedStateMap的内部结构中。您目前持续的是由KafkaUtils.createDStream创建的KafkaRDD。如果你没有迭代两次相同的输入,就没有必要坚持下去。

请记住,即使您的内部状态很大,它应该均匀分布在您的群集中。这意味着你并不是把所有的鸡蛋都放在一个篮子里,而是把它放在整个集群里。如果您的状态增长,则可以随时通过其他节点扩展群集。

+0

嗨,感谢您的回复,我正在寻找一种方法来压缩MapWithStaeRDD作为我的mapwithState函数的结果。由于默认情况下它的状态变得很大,所以预设模式是仅基于内存。我正在使用Spark 1.6.1。在我的情况下,键很大,超时是无限的..也许它使用UpdateStaeBykey取而代之,因为它的内部状态是Memory_Serlized ..但很想找到一种方法来压缩Mapwithstae RDD(甚至不能使用Kyro,因为OpenHashMapBasedStateMap在包中是私有的).. – mahdi62

+0

另外我注意到,有时我的mapwithsrateRDD变成序列化(序列化1x复制),然后占用更少的空间..但大部分时间它是内存反序列化1x复制..不确定这个! – mahdi62