0
我已经使用Kfka和mapwithsta函数编写了Spark应用程序。我已经创建了我的应用程序存储级别的快照将MemorywithStatedstream的默认预留模式从Memory仅渲染到其他模式
正如您所看到的,Kafka流在内存和磁盘中都被凝结了..但是我找不到一种方法来更改mapwithste内部流的默认存储。这个代码的PICE我使用
val messages=KafkaUtils.createDirectStream[String, String, (String,String)](ssc,
kafkaParams,
fromOffsets,
(r:org.apache.kafka.clients.consumer.ConsumerRecord[String,String]) =>(r.topic(),r.value()))
.persist(StorageLevel.MEMORY_AND_DISK_SER)
....
val mapped1=message.map(x=>(x._2.hashCode().toString(),x)).mapWithState(stateSpec1)
在我的应用程序张祖兴可以成为巨大的,所以我需要presiste内部张祖兴在emeory和disk..I将apprecite有这方面的帮助。
嗨,感谢您的回复,我正在寻找一种方法来压缩MapWithStaeRDD作为我的mapwithState函数的结果。由于默认情况下它的状态变得很大,所以预设模式是仅基于内存。我正在使用Spark 1.6.1。在我的情况下,键很大,超时是无限的..也许它使用UpdateStaeBykey取而代之,因为它的内部状态是Memory_Serlized ..但很想找到一种方法来压缩Mapwithstae RDD(甚至不能使用Kyro,因为OpenHashMapBasedStateMap在包中是私有的).. – mahdi62
另外我注意到,有时我的mapwithsrateRDD变成序列化(序列化1x复制),然后占用更少的空间..但大部分时间它是内存反序列化1x复制..不确定这个! – mahdi62