2015-07-10 85 views
1

我有一个是这样的火花驱动程序:星火缓存策略

编辑 - 代码的早期版本不同&没有工作

var totalResult = ... // RDD[(key, value)] 
var stageResult = totalResult 

do { 
    stageResult = stageResult.flatMap(
    // Some code that returns zero or more outputs per input, 
    // and updates `acc` to number of outputs 
    ... 
).reduceByKey((x, y) => x.sum(y)) 

    totalResult = totalResult.union(stageResult) 
} while(stageResult.count() > 0) 

我从我的数据的性质认识这将最终终止(我本质上聚合了一个DAG中的节点)。

我不确定合理的缓存策略 - 我应该每次缓存stageResult循环吗?我是否设置了一个可怕的递归塔,因为每个totalResult都取决于它自己以前的所有化身?或者Spark会为我指出这一点?或者我应该将每个RDD结果放在一个数组中,并在最后采取一个大的联合?

建议在这里受欢迎,谢谢。

+0

首先,蓄电池,当谈到转型是不可靠的。 –

+0

为什么不呢?我只是创建一个累加器并在地图中使用它,然后在驱动程序中检查它。 –

+0

在这种情况下理论上它可能在这里工作,但由于机器故障和可能的重新计数,计数本身并不可靠。但是,既然你只关心它是否为零,那么它可能会适合你。另一个说明,但。如果这是你的最终代码,那么这将不会发生任何事情。有没有行动正在运行,所以计算(因此累加器将永远不会触发) –

回答

2

我想改写这个如下:

do { 
    stageResult = stageResult.flatMap(
    //Some code that returns zero or more outputs per input 
).reduceByKey(_+_).cache 

    totalResult = totalResult.union(stageResult) 
} while(stageResult.count > 0) 

我相当肯定(95%),其DAG中union使用的stageResult将是正确的引用(尤其是数应触发它),但是这可能需要重新检查。

然后当您拨打totalResult.ACTION时,它会将所有缓存的数据放在一起。

ANSWER基于新的问题

只要你拥有的内存空间,然后我的确会cache一切前进的道路上,因为它存储每个stageResultunion荷兰国际集团所有的数据点在数据结束。事实上,每个工会都不依赖于过去,因为这不是RDD.union的语义,它只是把它们放在一起。由于RDD不变性,您可以轻松更改代码以使用val

最后一点,也许DAG可视化将有助于理解为什么就不会有递归的后果:

DAG

+0

但我不想调用count,因为这会触发RDD的额外迭代,对吧? –

+0

你需要调用'count'或者'while'中的代码永远不会做任何事情。而且,如果你正在缓存数据,那么额外的迭代应该大大减少。 –

+0

我知道我需要打电话给我。我的问题是关于缓存如何工作的细节。 –