2016-03-21 26 views
6

我有一个使用Apache Spark的Java程序。该方案的最有趣的部分是这样的:Java Apache Spark:长转换链导致二次时间

long seed = System.nanoTime(); 

JavaRDD<AnnotatedDocument> annotated = documents 
    .mapPartitionsWithIndex(new InitialAnnotater(seed), true); 
annotated.cache(); 

for (int iter = 0; iter < 2000; iter++) { 
    GlobalCounts counts = annotated 
     .mapPartitions(new GlobalCounter()) 
     .reduce((a, b) -> a.sum(b)); // update overall counts (*) 

    seed = System.nanoTime(); 

    // copy overall counts which CountChanger uses to compute a stochastic thing (**) 
    annotated = annotated 
     .mapPartitionsWithIndex(new CountChanger(counts, seed), true); 
    annotated.cache(); 

    // adding these lines causes constant time complexity like i want 
    //List<AnnotatedDocument> ll = annotated.collect(); 
    //annotated = sc.parallelize(ll, 8); 
} 

因此,实际上,行(**)导致的RDD与形式

documents 
    .mapPartitionsWithIndex(initial) 
    .mapPartitionsWithIndex(nextIter) 
    .mapPartitionsWithIndex(nextIter) 
    .mapPartitionsWithIndex(nextIter) 
    ... 2000 more 

很长的地图的确链。另外,当需要更新计数时,行(*)在每次迭代时强制计算(非延迟)。

我的问题是,我得到的每次迭代线性增加了时间复杂度,因此二次整体:

enter image description here

我想这是因为星火试图以“记住”每RDD链,容错算法或任何导致这种增长的因素。但是,我真的不知道。

我真正想做的是在每次迭代时告诉Spark“折叠”RDD,以便只有最后一个保存在内存中并继续工作。这应该导致每次迭代的时间不变,我想。这可能吗?还有其他解决方案吗?

谢谢!

+0

是否有任何理由为每次迭代缓存RDD?而不是在循环结束时缓存最近累积的RDD? –

+0

我还是用缓存的效果试验,所以我的答案必须是“不是真的。” – bombax

+0

你居然重用RDD每个计算?或者每次你想计算计数器时它是一个新的RDD? –

回答

6

尝试使用rdd.checkpoint。这将把RDD保存到hdfs并清除血统。

每次您转换一个RDD时,您都会生长血统,并且Spark必须跟踪可用的内容和必须重新计算的内容。处理DAG非常昂贵,大型DAG往往会很快地导致性能下降。通过“检查点”,您指示Spark计算并保存结果RDD,并丢弃它如何创建的信息。这使得它类似于简单地保存RDD并将其读回,这最小化了DAG操作。

在阿里纳斯,因为你打这个问题,这是件好事,加入steps知道union也影响RDD性能,也可以抛出StackOverflowError由于道路血统信息。 See this post

This link有更好的图表细节和主题也提到in this SO post

+0

你会考虑更新你的答案吗? –

+0

当然 - 忘了一切。 (工作阻止SO帖子) –

3

这是一个非常有趣的问题,需要考虑几件事情。

从根本上说,这是一个迭代算法,如果你看看Spark中的一些不同的迭代机器学习算法,你可以看到一些处理这类问题的方法。

他们大多数人不会在每次迭代中缓存的第一件事 - 而他们有一个可配置的缓存间隔。我可能会开始缓存每10次迭代,并看看如何。

另一个问题变成谱系图,你所做的每个mapPartitions正在增长一点点。在某些情况下,跟踪数据将开始变得越来越昂贵。 checkpoint允许您将Spark将当前的RDD写入持久性存储并丢弃沿袭信息。您可以尝试在每隔20次迭代的某个时间间隔内执行此操作,并查看结果如何。

的10个20号都只是某种基本的出发点,他们依赖于它是如何缓慢的计算数据为每个迭代,你可以和他们一起玩,以找到适合您的工作的正确调整。

1
  • 尝试使用annotated.count()每隔几次(需要调整)迭代缓存之前实现您的rdd。
  • 最好控制rdd缓存在哪里(...)而不是缓存(),它将rdd放在内存中,持久允许你选择它去哪里(取决于你的内存可用性)
  • 它更好以“保存”缓存/保留的rdd,然后在缓存/保留下一个循环后取消执行。 Spark自己做,但如果你控制它,火花将不需要选择从缓存抛出哪个rdd