2017-03-08 27 views
-1

我是Spark世界的新手,需要有关这个小问题的帮助。将数据加载到Java中时未发现类错误Apache Spark中自定义类的RDD

我有一个本地火花在一台主机和一台从机上运行,​​在同一台机器上。

我使用Spring BootGradle来创建我的Java应用程序,它将作业提交给Spark实例。

我有一个服务类获取JavaSparkContext

public void loadTransactions(JavaSparkContext context) { 
    try { 
     List<TransactionRecord> transactionRecordList = new ArrayList<>(); 
     Iterable<TransactionRecord> all = trxRecordRepository.findAll(); 
     all.forEach(trx -> transactionRecordList.add(trx)); 
     System.out.println("Trx array list ready: "+ transactionRecordList.size()); 
     JavaRDD<TransactionRecord> trxRecordRDD = context.parallelize(transactionRecordList, 4); 
     System.out.println(trxRecordRDD.count()); 
     System.out.println("data frame loaded"); 
    }catch (Exception e) { 
     logger.error("Error while loading transactions", e.getCause()); 
    }finally { 
     context.close(); 
    } 
} 

当我执行这种方法,从春数据JPA的transactionRecordRepository,成功地填充列表。 星火作业开始执行,但随后失败,出现以下错误:如果我从一个文本文件中加载一个简单的数据

2017-03-08 10:28:44.888 WARN 9021 --- [result-getter-2] o.apache.spark.scheduler.TaskSetManager : Lost task 1.0 in stage 0.0 (TID 1, 10.20.12.216, executor 0): java.io.IOException: java.lang.ClassNotFoundException: learning.spark.models.TransactionRecord 
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1276) 
    at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) 
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) 
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) 
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:258) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.ClassNotFoundException: learning.spark.models.TransactionRecord 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    at java.lang.Class.forName0(Native Method) 
    at java.lang.Class.forName(Class.java:348) 
    at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67) 
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819) 
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) 
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1919) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1529) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) 
    at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552) 
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply$mcV$sp(ParallelCollectionRDD.scala:74) 
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70) 
    at org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:70) 
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1269) 
    ... 20 more 

,一切工作正常。

JavaRDD<String> movieData = context.textFile("/Users/panshul/Development/sparkDataDump/ratings.csv", 4); 
      count = movieData.count(); 

我gradle这个build文件:

buildscript { 
    ext { 
     springBootVersion = '1.5.2.RELEASE' 
    } 
    repositories { 
     mavenCentral() 
    } 
    dependencies { 
     classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") 
    } 
} 

apply plugin: 'java' 
apply plugin: 'eclipse' 
apply plugin: 'idea' 
apply plugin: 'org.springframework.boot' 

jar { 
    baseName = 'spark-example' 
    version = '0.0.1-SNAPSHOT' 
} 

sourceCompatibility = 1.8 
targetCompatibility = 1.8 

repositories { 
    mavenCentral() 
    mavenLocal() 
} 


dependencies { 
    compile('org.springframework.boot:spring-boot-starter-web') { 
     exclude module: "spring-boot-starter-tomcat" 
    } 
    compile("org.springframework.boot:spring-boot-starter-jetty") 
    compile("org.springframework.boot:spring-boot-starter-actuator") 
    compile("org.springframework.boot:spring-boot-starter-data-jpa") 
    compile("mysql:mysql-connector-java:6.0.5") 
    compile("org.codehaus.janino:janino:3.0.6") 
    compile("org.apache.spark:spark-core_2.11:2.1.0") 
      { 
       exclude group: "org.slf4j", module: "slf4j-log4j12" 
      } 
    compile("org.apache.spark:spark-sql_2.11:2.1.0") 
      { 
       exclude group: "org.slf4j", module: "slf4j-log4j12" 
      } 
    testCompile("org.springframework.boot:spring-boot-starter-test") 
    testCompile("junit:junit") 
} 

请帮我找出我在做什么错在这里。使用Spark version 2.1.0 从Spark网站下载并安装。 在MacOs Sierra上运行。

+0

请问您可以分享您的'build.gradle'文件吗? – semsorock

回答

0

我不得不创建一个我用的所有自定义类的罐子,放在我的Apache-的jars文件夹Spark安装。

这使得火花大师发现我的自定义RDD类型类并传播给工作人员。

1

我认为你的问题是,你提交作业时,你的类learning.spark.models.TransactionRecord不包含在类路径中。

您必须在spark-submit --jars参数中指定所有依赖的jar,或者您必须创建一个具有所有依赖关系的大jar。

我认为最简单的方法就是提交多个罐子这样的:

$SPARK_HOME/bin/spark-submit --name yourApp --class yourMain.class --master yourMaster --jars dependencyA.jar, dependencyB.jar, job.jar 
+0

也分享你的Gradle将帮助 –

+0

我没有将作业手动提交给Spark。我有一个webservice正在运行,它只是创建一个SparkInstance并调用方法,它会自动将作业提交给Spark实例。 – Panshul

+0

行,但仍然导致你的问题将是相同的,我想learning.spark.models.TransactionRecord类ls从依赖关系(我想,它是在单独的项目中,你的主要星火工作应用程序)缺少传递给你的Spark实例 –

相关问题