2017-02-16 33 views
-1

我正在运行一个Spark Scala程序,用于在输入文件中执行文本扫描。我试图通过使用rdd.mappartition来实现并行性。在mappartition部分内,我正在执行一些检查并调用map函数来实现每个分区的并行执行。在map函数中,我正在调用一个自定义方法,在那里执行扫描并将结果发送回去。Spark Map partiton不能在纱线簇模式下工作

现在,当我使用--master local [*]提交代码时,代码工作正常,但当我使用--master yarn-cluster提交代码时,代码无法正常工作。它的工作没有任何错误,但是调用并没有进入mappartition本身。我通过放置少量println语句来验证这一点。

请帮我提一下你的建议。 下面是示例代码:

def main(args: Array[String]) { 

    val inputRdd = sc.textFile(inputFile,2) 
    val resultRdd = inputRdd.mapPartitions{ iter => 

    println("Inside scanning method..") 
    var scanEngine = ScanEngine.getInstance(); 
    ... 
    .... 
    .... 
    var mapresult = iter.map { y => 
     line = y 
     val last = line.lastIndexOf("|"); 
     message = line.substring(last + 1, line.length()); 
     getResponse(message) 
    } 
    } 

    val finalRdd = sc.parallelize(resultRdd.map(x => x.trim())) 
    finalRdd.coalesce(1, true).saveAsTextFile(hdfsOutpath) 

} 

def getResponse(input: String): String = { 
    var result = ""; 
    val rList = new ListBuffer[String](); 

    try { 
     //logic here 
    } 
    return result; 
} 
+0

什么不起作用?你有堆栈跟踪吗? –

+0

它正在工作,但在mappartition内写入的逻辑没有得到执行,当我在--master纱群集模式下运行 –

+1

这条线很奇怪 –

回答

1

如果它的工作是看到内扫描方法的证据..打印出来,也不会当,因为该代码被执行群集上运行出现工人,而不是司机。

你将不得不在法庭细节中仔细阅读代码,以开放的心态去尝试找出工作没有输出的原因。通常,当作业在本地模式下工作时,而不是在群集上时,这是由于代码在其中执行的位置或记录输出的位置的微妙之处。

有太多限制的代码来提供更具体的答案。

+0

编辑解决实际问题! – ImDarrenG

+0

我正在从mappartition中调用的方法返回一个迭代器....我没有在这里发布完整的程序。此外,我使用一个可变列表集合来保存多个返回的结果,并最终使用List.mkString(“\ n”)方法将其转换为字符串,然后将其返回...我怀疑Mutable集合的用法是问题....你对此有何看法? –

+0

@DILIPKUMAR可变集合的使用不应该导致问题。 'getResponse'中的'try'语句防止了异常被报告? – ImDarrenG

0

Spark使用map函数以及mapPartitions实现了并行性。分区的数量决定了并行的数量,但无论您是否使用mapPartitions函数,每个分区都将独立执行。

只有几个理由使用mapPartitions而不是map;例如功能的初始化成本很高,但可以多次调用它,例如在文本上执行一些NLP任务

相关问题