2016-08-11 145 views
0

我使用的Spark运行它使用java.util.logging.Logger现有的Java包,我得到了一个错误:火花java.util.logging.Logger中

org.apache.spark.SparkException: Task not serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) 
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:911) 
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:910) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:910) 
    at org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:332) 
    at org.apache.spark.api.java.AbstractJavaRDDLike.foreach(JavaRDDLike.scala:46) 
    at edu.uth.clamp.nlp.main.RunPipelineWithSpark.processFolder(RunPipelineWithSpark.java:271) 
    at edu.uth.clamp.nlp.main.RunPipelineWithSpark.process(RunPipelineWithSpark.java:179) 
    at edu.uth.clamp.nlp.main.RunPipelineWithSpark.main(RunPipelineWithSpark.java:136) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.io.NotSerializableException: java.util.logging.Logger 
Serialization stack: 
    - object not serializable (class: java.util.logging.Logger, value: [email protected]) 
    - field (class: edu.uth.clamp.nlp.ner.CRFNameEntityRecognizer, name: logger, type: class java.util.logging.Logger) 
    - object (class edu.uth.clamp.nlp.ner.CRFNameEntityRecognizer, [email protected]) 
    - field (class: edu.uth.clamp.nlp.uima.NameEntityUIMA, name: recognizer, type: class edu.uth.clamp.nlp.ner.CRFNameEntityRecognizer) 
    - object (class edu.uth.clamp.nlp.uima.NameEntityUIMA, edu.[email protected]) 
    - writeObject data (class: java.util.ArrayList) 

回答

0

您的代码可能看起来像

NameEntityUIMA nameEntity = ...; 
JavaRDD<SomeType> rdd = ...; 
rdd.foreach(x -> /* code using nameEntity */); 

foreach必须序列化其参数以将其发送到每个节点;因为参数使用nameEntity,它也需要序列化,但它不能(因为Java序列化的设计,这只能在运行时检测到,而不是编译错误)。相反,您想要在每个分区上创建nameEntity。你可以做

JavaRDD<SomeType> rdd = ...; 
rdd.foreach(x -> { 
    NameEntityUIMA nameEntity = ...; 
    /* code using nameEntity */ 
}); 

但这对于RDD中的每个元素,这可怕的执行创建一个新的nameEntity。相反,请使用foreachPartition

0

Spark期望在rdd/dstream转换中传递的函数应该是可序列化的。由于java.util.logging.Logger不可序列化,因此您不应该在函数内部记录相关的代码。您可以用简单的println替换日志。或者你可以尝试在这里建议的选项。

Apache Spark logging within Scala

注意,日志可以在驱动程序代码在那里。 并确保它没有在你的函数之外引用任何不可序列化的变量。为了更好地理解由于关闭导致的序列化,请学习关闭概念docdoc2

2

请检查您是否尝试序列化记录器实例,使记录器字段为静态或瞬态。

1

尝试在创建日志对象时使用@transient lazy val。此外,最好在关闭时使用它,以便火花本身会照顾到这一点。

0

记录器不可序列化,并且很可能您正试图从执行程序访问它。我会建议将其定义为延迟

lazy val logger = .... 

但是缺点是您不应该在驱动程序中使用记录器。另一个不太性感的选择是为执行者提供另一个记录器。