checkpoint
,与Spark中的许多其他操作一样,是懒惰的。数据实际上是检查点,当且仅当给定的RDD被实现时。您看到的空目录是特定于应用程序的检查点目录。
如果您希望检查点发生,您必须触发一个操作来评估相应的RDD。举例(本地模式):
import glob
import os
from urllib.parse import urlparse
sc.setCheckpointDir("/tmp/checkpoints/")
ch_dir = os.path.join(urlparse(sc._jsc.getCheckpointDir().orElse("")).path, "*")
rdd = sc.range(1000, 10)
plus_one = rdd.map(lambda x: x + 1)
plus_one.cache()
plus_one.checkpoint() # No checkpoint dir here yet
[os.path.split(x)[-1] for x in glob.glob(ch_dir)]
## []
plus_one.isCheckpointed()
## False
# After count is executed you'll see rdd specific checkpoint dir
plus_one.count()
[os.path.split(x)[-1] for x in glob.glob(ch_dir)]
## ['rdd-1']
plus_one.isCheckpointed()
## True
您还可以分析调试字符串之前:
## (8) PythonRDD[1] at RDD at PythonRDD.scala:48 [Memory Serialized 1x Replicated]
## | ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:475 [Memory Serialized 1x Replicated]
和行动后:
## (8) PythonRDD[1] at RDD at PythonRDD.scala:48 [Memory Serialized 1x Replicated]
## | CachedPartitions: 8; MemorySize: 168.0 B; ExternalBlockStoreSize: 0.0 B; DiskSize: 0.0 B
## | ReliableCheckpointRDD[3] at count at <ipython-input-16-96e746c56973>:1 [Memory Serialized 1x Replicated]
正如你可以看到RDD将被计算之前从零开始,但在count
之后,您将获得ReliableCheckpointRDD
。