2015-07-03 86 views
3

我想了解扩展到哪一个必须编译一个罐子使用火花。与罐子火花工作流程

我通常会在IDE中编写临时分析代码,然后通过单击(在IDE中)对数据进行本地运行。如果我的Spark实验给了我正确的指示,那么我必须将我的脚本编译成jar,并将它发送到所有Spark节点。即我的工作流程是

  1. 写分析脚本,将上传自身的罐子(下面创建 )
  2. 转到使罐子。
  3. 运行脚本。

对于临时迭代工作,这似乎有点多,我不明白如果没有它,REPL就会如何消失。

更新:

下面是一个例子,我不能去上班,除非我编译成一个罐子,做了sc.addJar。但是我必须这样做的事实看起来很奇怪,因为只有普通的Scala和Spark代码。

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 
import org.apache.spark.SparkFiles 
import org.apache.spark.rdd.RDD 

object Runner { 
    def main(args: Array[String]) { 
    val logFile = "myData.txt" 
    val conf = new SparkConf() 
     .setAppName("MyFirstSpark") 
     .setMaster("spark://Spark-Master:7077") 

    val sc = new SparkContext(conf) 

    sc.addJar("Analysis.jar") 
    sc.addFile(logFile) 

    val logData = sc.textFile(SparkFiles.get(logFile), 2).cache() 

    Analysis.run(logData) 
    } 
} 

object Analysis{ 
    def run(logData: RDD[String]) { 
    val numA = logData.filter(line => line.contains("a")).count() 
    val numB = logData.filter(line => line.contains("b")).count() 
    println("Lines with 'a': %s, Lines with 'b': %s".format(numA, numB)) 
    } 
} 
+0

您错过了一个重要的观点:如果您的数据足够小,可以在本地进行高效处理,那么您不需要火花或任何技术来分配计算。 – C4stor

+0

我的数据会很大(在HDFS中),但是当我重复我的方法时,我的计算会发生变化。我越来越接近局部的局限,因此探索了火花。 – Pengin

回答

1

您正在使用“过滤器”的创建一个匿名函数工作:

scala> (line: String) => line.contains("a") 
res0: String => Boolean = <function1> 

该函数的生成的名称不可用除非罐子被分配给工人。工人堆栈跟踪是否突出显示缺少的符号?

如果你只是想在不分发您可以使用“本地”大师的jar本地调试:

val conf = new SparkConf().setAppName("myApp").setMaster("local") 
+0

假设REPL上会发生同样的事情?即如果我尝试使用这样的函数,那么它在启动REPL时需要先前已编译和声明。 – Pengin

1

在创建JAR文件是处理长期运行星火作业,互动发展工作星火最常见的方式有可用的炮弹直接在Scala中,巨蟒& R.目前的快速入门指南(https://spark.apache.org/docs/latest/quick-start.html)只提到Scala & Python外壳,但SparkR指南讨论了如何交互式地使用SparkR(请参阅https://spark.apache.org/docs/latest/sparkr.html)。祝您好运与您的旅途成为星火的,你发现自己有更大的数据集:)

1

您可以使用SparkContext.jarOfObject(Analysis.getClass)来自动包括JAR那你想分发而不用自己打包。

查找从中加载给定类的JAR,以便 用户将其JAR传递到SparkContext。

def jarOfClass(cls: Class[_]): Option[String] 
def jarOfObject(obj: AnyRef): Option[String] 

你想要做的事,如:

sc.addJar(SparkContext.jarOfObject(Analysis.getClass).get) 

HTH!