我正在运行一个Spark Scala程序,用于在输入文件中执行文本扫描。我试图通过使用rdd.mappartition来实现并行性。在mappartition部分内,我正在执行一些检查并调用map函数来实现每个分区的并行执行。在map函数中,我正在调用一个自定义方法,在那里执行扫描并将结果发送回去。Spark Map partiton不能在纱线簇模式下工作
现在,当我使用--master local [*]提交代码时,代码工作正常,但当我使用--master yarn-cluster提交代码时,代码无法正常工作。它的工作没有任何错误,但是调用并没有进入mappartition本身。我通过放置少量println语句来验证这一点。
请帮我提一下你的建议。 下面是示例代码:
def main(args: Array[String]) {
val inputRdd = sc.textFile(inputFile,2)
val resultRdd = inputRdd.mapPartitions{ iter =>
println("Inside scanning method..")
var scanEngine = ScanEngine.getInstance();
...
....
....
var mapresult = iter.map { y =>
line = y
val last = line.lastIndexOf("|");
message = line.substring(last + 1, line.length());
getResponse(message)
}
}
val finalRdd = sc.parallelize(resultRdd.map(x => x.trim()))
finalRdd.coalesce(1, true).saveAsTextFile(hdfsOutpath)
}
def getResponse(input: String): String = {
var result = "";
val rList = new ListBuffer[String]();
try {
//logic here
}
return result;
}
什么不起作用?你有堆栈跟踪吗? –
它正在工作,但在mappartition内写入的逻辑没有得到执行,当我在--master纱群集模式下运行 –
这条线很奇怪 –