2017-08-22 61 views
1

我有一个看起来像下面java.lang.NoClassDefFoundError:在Scala代码通过启动火花作业时无法初始化类火花提交

 
object ErrorTest { 
case class APIResults(status:String, col_1:Long, col_2:Double, ...) 

def funcA(rows:ArrayBuffer[Row])(implicit defaultFormats:DefaultFormats):ArrayBuffer[APIResults] = { 
    //call some API ang get results and return APIResults 
    ... 
} 

// MARK: load properties 
val props = loadProperties() 
private def loadProperties(): Properties = { 
    val configFile = new File("config.properties") 
    val reader = new FileReader(configFile) 
    val props = new Properties() 
    props.load(reader) 
    props 
} 

def main(args: Array[String]): Unit = { 
    val prop_a = props.getProperty("prop_a") 

    val session = Context.initialSparkSession(); 
    import session.implicits._ 

    val initialSet = ArrayBuffer.empty[Row] 
    val addToSet = (s: ArrayBuffer[Row], v: Row) => (s += v) 
    val mergePartitionSets = (p1: ArrayBuffer[Row], p2: ArrayBuffer[Row]) => (p1 ++= p2) 

    val sql1 = 
    s""" 
     select * from tbl_a where ... 
    """ 

    session.sql(sql1) 
    .rdd.map{row => {implicit val formats = DefaultFormats; (row.getLong(6), row)}} 
    .aggregateByKey(initialSet)(addToSet,mergePartitionSets) 
    .repartition(40) 
    .map{case (rowNumber,rows) => {implicit val formats = DefaultFormats; funcA(rows)}} 
    .flatMap(x => x) 
    .toDF() 
    .write.mode(SaveMode.Overwrite).saveAsTable("tbl_b") 
    } 
} 

,当我通过​​运行它,它会抛出错误代码导致:java.lang.NoClassDefFoundError:无法初始化类staging_jobs.ErrorTest $。但如果我将val props = loadProperties()转换为main方法的第一行,那么就没有错误了。有谁能给我一个关于这个现象的解释吗?非常感谢!

Caused by: java.lang.NoClassDefFoundError: Could not initialize class staging_jobs.ErrorTest$ 
    at staging_jobs.ErrorTest$$anonfun$main$1.apply(ErrorTest.scala:208) 
    at staging_jobs.ErrorTest$$anonfun$main$1.apply(ErrorTest.scala:208) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) 
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) 
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:243) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188) 
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193) 
    ... 8 more 

回答

0

我遇到了和你一样的问题。我在main方法之外定义了一个方法convert。当我在main中使用dataframe.rdd.map{x => convert(x)}时,NoClassDefFoundError:Could not initialize class Test$发生了。

但是,当我使用功能对象convertor,这是与convert方法相同的代码,在main方法中,没有发生错误。

我使用了spark 2.1.0,scala 2.11,它好像是一个spark中的bug?

0

我想问题是val props = loadProperties()定义了一个外部类(main)的成员。然后这个成员将被序列化(或运行)在执行者上,这些执行者没有驱动程序的保存环境。