我遇到了一个奇怪的问题,在一个数据集上调用unpersist()
会影响同一代码块中另一个数据集的计数。不幸的是,这发生在一个复杂的长时间运行的工作中,有很多数据集,所以我不能在这里总结整个事情。我知道这是一个棘手的问题,但让我试着勾画出来。我正在寻找的是一些确认,这种行为是意想不到的,以及关于它为什么会发生或者我们如何避免它的任何想法。Dataset.unpersist()意外地影响了其他RDD的计数
编辑:此问题报告发生在Spark 2.1.1上,但不会在2.1.0上发生。问题是100%可重复的,但只有在我的项目中有1000行代码和数据,我正努力尝试将其提炼成一个简明的示例,但尚未能够实现,我将发布任何更新或重新提交我的问题,如果我找到了什么。事实上,完全相同的代码和数据在2.1.0而不是2.1.1中起作用,这使我相信这是由于Spark内的某些原因造成的。
val claims:Dataset = // read claims from file
val accounts:Dataset = // read accounts from file
val providers:Dataset = // read providers from file
val payers:Dataset = // read payers from file
val claimsWithAccount:Dataset = // join claims and accounts
val claimsWithProvider:Dataset = // join claims and providers
val claimsWithPayer:Dataset = // join claimsWithProvider and payers
claimsWithPayer.persist(StorageLevel.MEMORY_AND_DISK)
log.info("claimsWithPayer = " + claimsWithPayer.count()) // 46
// This is considered unnecessary intermediate data and can leave the cache
claimsWithAccount.unpersist()
log.info("claimsWithPayer = " + claimsWithPayer.count()) // 41
本质上,在一系列连接主叫对中间数据集之一unpersist()
影响在后面的数据组中的一个的行数,如通过Dataset.count()
报道。
我的理解是,unpersist()
应该从缓存中删除数据,但不应该影响其他数据集的数量或内容?这是特别令人惊讶的,因为我在未执行其他数据之前明确坚持claimsWithPayer
。
“大致如此”并不够好。请尝试使用[mcve] - 它或者是__严重的正确性错误___([然后应该被报告])(https://issues.apache.org/jira/projects/SPARK/summary),并且有足够的信息来确定问题的来源)__或者这是你的错误(例如假设Spark没有提供任何保证的确定性行为)并且朝着最小的例子努力应该给出一些提示。另外请确保您使用的是最新的次要版本。 – zero323
像许多其他Spark问题一样,它只发生在某些数据和情况下。我花了很多时间把它简化为一个简明的例子,但还没有成功。我的问题是关于Spark的缓存语义,坚持和unpersist类似于这个问题https://stackoverflow.com/questions/29903675/understanding-sparks-caching –
所以就像我说:如果'claimsWithPayer'的每个祖先是确定性的(足够),那么持久性应该完全没有影响,这是一个错误。否则它可能会误解语义。这是由你来弄清楚是否是这种情况。如果您可以将问题简化为可管理的,我建议将其转发给开发人员列表。如果在2.1.0和2.1.1之间发生了变化,则更有可能有人负责人将能够识别该问题。 – zero323