我有一个使用Apache Spark的Java程序。该方案的最有趣的部分是这样的:Java Apache Spark:长转换链导致二次时间
long seed = System.nanoTime();
JavaRDD<AnnotatedDocument> annotated = documents
.mapPartitionsWithIndex(new InitialAnnotater(seed), true);
annotated.cache();
for (int iter = 0; iter < 2000; iter++) {
GlobalCounts counts = annotated
.mapPartitions(new GlobalCounter())
.reduce((a, b) -> a.sum(b)); // update overall counts (*)
seed = System.nanoTime();
// copy overall counts which CountChanger uses to compute a stochastic thing (**)
annotated = annotated
.mapPartitionsWithIndex(new CountChanger(counts, seed), true);
annotated.cache();
// adding these lines causes constant time complexity like i want
//List<AnnotatedDocument> ll = annotated.collect();
//annotated = sc.parallelize(ll, 8);
}
因此,实际上,行(**)导致的RDD
与形式
documents
.mapPartitionsWithIndex(initial)
.mapPartitionsWithIndex(nextIter)
.mapPartitionsWithIndex(nextIter)
.mapPartitionsWithIndex(nextIter)
... 2000 more
很长的地图的确链。另外,当需要更新计数时,行(*)在每次迭代时强制计算(非延迟)。
我的问题是,我得到的每次迭代线性增加了时间复杂度,因此二次整体:
我想这是因为星火试图以“记住”每RDD链,容错算法或任何导致这种增长的因素。但是,我真的不知道。
我真正想做的是在每次迭代时告诉Spark“折叠”RDD,以便只有最后一个保存在内存中并继续工作。这应该导致每次迭代的时间不变,我想。这可能吗?还有其他解决方案吗?
谢谢!
是否有任何理由为每次迭代缓存RDD?而不是在循环结束时缓存最近累积的RDD? –
我还是用缓存的效果试验,所以我的答案必须是“不是真的。” – bombax
你居然重用RDD每个计算?或者每次你想计算计数器时它是一个新的RDD? –