2016-11-27 129 views
2

我正在使用火花mapwithstate,但存储空间不断增加。spark-mapwithstate为什么存储空间的数量仍然是20?

enter image description here

问题1:

内存9GB

MapPartitionsRDD大小x 20则可以减少这种规模?

问题2:

And in InternalMapWithStateDStream, storagelevel is fixed to MEMORY_ONLY. 

我想改变坚持(StorageLevel.MEMORY_ONLY_SER),因为它的规模。可能吗?

问题3:

Private [streaming] object InternalMapWithStateDStream { 
    Private val DEFAULT_CHECKPOINT_DURATION_MULTIPLIER = 10 
} 

我想减少这种检查点值。可能吗?

回答

2

MapPartitionsRDD内存大小9GB x 20您能缩小这一尺寸吗?

它是9GB x 20,但它是如何分布在您的群集中的?您需要单击RDD名称以查看分布式状态。如果你想减少内存大小,你需要考虑一种有效的方式来表示你的数据。

我想更改为保留(StorageLevel.MEMORY_ONLY_SER),因为其大小为 。可能吗?

不,您不能覆盖MapWithStateDStream的内存设置。

我想减少此检查点值。可能吗?

是的,你可以设置DStream小号检查时间间隔:

dStream.mapWithState(spec).checkpoint(Seconds(4)) 
+0

谢谢你的回答。 ** 1。该集群分布良好**但您只需要最后一个状态存储,为什么还有剩余的19个存储? ** dStream.mapWithState(spec).checkpoint(秒(4))**不工作默认情况下运行10 interva'val kafkastateStream = chnlStream.mapWithState(stateSpec) kafkastateStream.checkpoint(Seconds(10 * 5)) val kafkaSnapshotStream = kafkastateStream.stateSnapshots()' –

+0

@HyunkeunLee *但你只需要最后一个状态存储,为什么你还剩下19个存储?*这完全取决于你如何存储状态。新来的国家有*不同的钥匙*?你在每次迭代中存储新状态吗?向我们展示代码。关于检查点,如果它不使用自定义间隔,它应该运行每个批处理时间*默认间隔*。 –

相关问题