2015-12-15 41 views
0

我想模拟一个遗传学问题,我们试图解决,建立在它的步骤。我可以成功运行Spark示例中的PiAverage示例。这个例子“掷飞镖”在一个圆圈(我们的情况是10^6),并计算“落在圆圈内”的数字来估计PI在Apache Spark中,我可以轻松地重复/嵌套一个SparkContext.parallelize吗?

假设我想重复这个过程1000次(并行)和平均所有这些估计。我试图看到最好的方法,似乎有两个并行化调用?嵌套调用?有没有办法将地图连接在一起或减少呼叫?我看不到它。

我想知道类似下面的想法的智慧。我想使用累加器跟踪结果估计。 jsc是我的SparkContext,单次运行的完整代码在问题结束时,感谢您的任何输入!

Accumulator<Double> accum = jsc.accumulator(0.0); 

// make a list 1000 long to pass to parallelize (no for loops in Spark, right?) 
List<Integer> numberOfEstimates = new ArrayList<Integer>(HOW_MANY_ESTIMATES); 

// pass this "dummy list" to parallelize, which then 
// calls a pieceOfPI method to produce each individual estimate 
// accumulating the estimates. PieceOfPI would contain a 
// parallelize call too with the individual test in the code at the end 
jsc.parallelize(numberOfEstimates).foreach(accum.add(pieceOfPI(jsc, numList, slices, HOW_MANY_ESTIMATES))); 

// get the value of the total of PI estimates and print their average 
double totalPi = accum.value(); 

// output the average of averages 
System.out.println("The average of " + HOW_MANY_ESTIMATES + " estimates of Pi is " + totalPi/HOW_MANY_ESTIMATES); 

它似乎并不像一个矩阵或其他答案我看到这样就给回答这个具体问题,我做了几个搜索,但我没有看到如何做到这一点没有“并行的并行化。 “这是一个坏主意吗?

(是的,我从数学角度认识到,我可以做更多的估计,并且得到相同的结果:)试图建立一个我的老板想要的结构,再次感谢!

我已经把我的整个单测试程序放在这里,如果有帮助的话,那就是我正在测试的累加器。这个核心会变成PieceOfPI():

import java.io.Serializable; 
import java.util.ArrayList; 
import java.util.List; 

import org.apache.spark.Accumulable; 
import org.apache.spark.Accumulator; 
import org.apache.spark.SparkContext; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.api.java.function.Function2; 
import org.apache.spark.storage.StorageLevel; 
import org.apache.spark.SparkConf; 
import org.apache.spark.storage.StorageLevel; 

public class PiAverage implements Serializable { 

public static void main(String[] args) { 

    PiAverage pa = new PiAverage(); 
    pa.go(); 

} 

public void go() { 

    // should make a parameter like all these finals should be 
    // int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2; 
    final int SLICES = 16; 

    // how many "darts" are thrown at the circle to get one single Pi estimate 
    final int HOW_MANY_DARTS = 1000000; 

    // how many "dartboards" to collect to average the Pi estimate, which we hope converges on the real Pi 
    final int HOW_MANY_ESTIMATES = 1000; 

    SparkConf sparkConf = new SparkConf().setAppName("PiAverage") 
     .setMaster("local[4]"); 

    JavaSparkContext jsc = new JavaSparkContext(sparkConf); 

    // setup "dummy" ArrayList of size HOW_MANY_DARTS -- how many darts to throw 
    List<Integer> throwsList = new ArrayList<Integer>(HOW_MANY_DARTS); 
    for (int i = 0; i < HOW_MANY_DARTS; i++) { 
     throwsList.add(i); 
    } 

    // setup "dummy" ArrayList of size HOW_MANY_ESTIMATES 
    List<Integer> numberOfEstimates = new ArrayList<Integer>(HOW_MANY_ESTIMATES); 
    for (int i = 0; i < HOW_MANY_ESTIMATES; i++) { 
     numberOfEstimates.add(i); 
    } 

    JavaRDD<Integer> dataSet = jsc.parallelize(throwsList, SLICES); 

    long totalPi = dataSet.filter(new Function<Integer, Boolean>() { 
     public Boolean call(Integer i) { 
      double x = Math.random(); 
      double y = Math.random(); 
      if (x * x + y * y < 1) { 
       return true; 
      } else 
       return false; 
     } 
    }).count(); 

    System.out.println(
      "The average of " + HOW_MANY_DARTS + " estimates of Pi is " + 4 * totalPi/(double)HOW_MANY_DARTS); 

    jsc.stop(); 
    jsc.close(); 
} 
} 
+0

只是在这个问题的一个小背景下,我的老板看到了创建RDD的构造,然后分配了一个映射函数的输出。他想知道为什么需要“额外的RDD”,因为地图会生成额外的RDD。这可能是一个单独的问题,但是激发了我的问题,即一系列地图是否可以链接,但是迭代次数不同,就像我做的{for j do}循环 – JimLohse

回答

1

让我从你的“背景问题”开始。像map,join,groupBy等转换操作分为两类;那些需要将所有分区的数据作为输入进行洗牌的数据,以及那些不需要数据的数据。像groupByjoin这样的操作需要洗牌,因为您需要将所有RDD分区中的所有记录与相同的密钥汇总在一起(想想SQL JOINGROUP BY操作是如何工作的)。另一方面,map,flatMap,filter等不需要混洗,因为该操作对上一步的分区的输入工作正常。他们一次处理单个记录,而不是使用匹配键的组。因此,不需要洗牌。

这个背景对于理解“额外地图”没有显着开销是必要的。像map,flatMap等一系列操作被“压缩”在一起,形成一个“阶段”(当您查看Spark Web控制台中的作业的详细信息时显示),以便只有一个RDD被物化,舞台的尽头。

解答你的第一个问题。我不会为此使用累加器。它们用于“边带”数据,如计算您分析的多少坏线。在这个例子中,您可以使用累加器来计算1个半径内外的(x,y)对的数量,例如。

Spark分配中的JavaPiSpark示例与其得到的一样好。你应该研究它为什么起作用。这是大数据系统的正确数据流模型。你可以使用“聚合器”。在Javadocs中,单击“索引”并查看aggaggregateaggregateByKey函数。但是,在这里它们不再可以理解,也没有必要。它们提供了比map然后reduce更大的灵活性,所以他们是值得了解的

与您的代码的问题是,你实际上想告诉星火做什么,而不是表达自己的意图,让星火优化是怎样做的了您。

最后,我建议你购买并学习O'Reilly的“Learning Spark”。它很好地解释了内部细节,如分段,并且还显示了大量可用的示例代码。

+0

非常有用,每个单词!我有这本书,每次我把头伸进去,我都会学到一些令人惊喜的东西。感谢你让我在这么多层面上直线前进。我认为你的答案在很多层面上都会出现,很多Spark新人会觉得这很有帮助。 – JimLohse

+0

因此,在重新审视了学习火花书后,我仍然非常喜欢你的答案,但我不知道如何完成这两件基本的事情。我可能会尝试发布更通用的其他问题?这两个问题是1)假设我没有一个固定长度的数据集开始,并且需要告诉Spark执行x次操作(x = 10^6),创建一个虚拟数组并且传递它似乎很奇怪并行化,但这是我迄今为止所能看到的。我应该编辑问题还是发布单独的问题? – JimLohse

+0

和“嵌套”问题:2)如果我想运行b()10^6次作为“内部循环”,然后对b()的结果运行()作为“外部循环”吗?嵌套并行调用? (是的,你可能会碰到墙壁,但我仍然没有看到那本书中最好的方法抱歉) – JimLohse

相关问题