2016-01-24 38 views
-2

我需要将SparkContext传递给我的函数,并建议我如何在下面的场景中执行此操作。如何将Spark上下文传递给来自foreach的函数

我有一个序列,每个元素指向我们从中获取RDD并处理它们的特定数据源。我已经定义了一个函数,它接受了spark上下文和数据源并做了必要的事情。我很遗憾地使用while循环。但是,我想用foreach或map来做,所以我可以暗示并行处理。我需要激发功能的上下文,但是我怎么能从foreach中传递它呢?

只是一个样本代码,我可以不存在实际的代码:

import org.apache.spark.SparkConf 
import org.apache.spark.SparkContext 
import org.apache.spark.sql.SQLContext 

object RoughWork { 
    def main(args: Array[String]) { 

    val str = "Hello,hw:How,sr:are,ws:You,re"; 
    val conf = new SparkConf 
    conf.setMaster("local"); 
    conf.setAppName("app1"); 
    val sc = new SparkContext(conf); 
    val sqlContext = new SQLContext(sc); 

    val rdd = sc.parallelize(str.split(":")) 
    rdd.map(x => {println("==>"+x);passTest(sc, x)}).collect(); 

    } 

    def passTest(context: SparkContext, input: String) { 
    val rdd1 = context.parallelize(input.split(",")); 
    rdd1.foreach(println) 
    } 
} 
+0

你现在可以举一个你的代码的简单例子,以及到目前为止你已经尝试过了吗? – sgvd

+0

我不能给实际的代码,但提供了一个示例代码,需要解决.....这只是一个样本,使其工作... – Srini

回答

1

无法通过周围的SparkContext这样。 passTest将在执行程序上运行,而SparkContext在驱动程序上运行。

如果我不得不做这样的双重分裂,一种方法是使用flatMap

rdd 
    .zipWithIndex 
    .flatMap(l => { 
    val parts = l._1.split(","); 
    List.fill(parts.length)(l._2) zip parts}) 
    .countByKey 

可能有更漂亮的方式,但基本的想法是,你可以使用zipWithIndex跟踪哪一行来自一个项目,然后使用键值对RDD方法来处理您的数据。

如果您拥有多个关键字或一般更多的结构化数据,您可以考虑使用带有DataFrame(或最新版本中的DataSet)的Spark SQL,而不是flatMap而不是explode

+0

你可以放心地忽略_pretty_甚至_I'm非常sure_ :) – zero323

+0

谢谢。我知道这是不可能的。但想要检查一下,因为有人会遇到这种情况。任何方式,想删除这个问题 – Srini

相关问题