2016-05-13 56 views
0

我想写使用Scala的一个HDFS输出文件,我收到下面的错误:写HDFS OUTPUTFILE使用Scala

例外在线程“主要” org.apache.spark.SparkException:任务不可串行化的 at org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:315) at org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(ClosureCleaner。 scala:305) at org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:132) at org.apache.spark.SparkContext.clean(SparkContext.scala:1893) at org.apache.spark .rdd.RDD $$ anonfun $ foreach $ 1.apply(RDD.scala:869) at org.apache.spark.rdd.RDD $$ anonfun $ foreach $ 1.apply(RDD.scala:868) at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:147) at org .apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:108) at org.apache.spark.rdd.RDD.withScope(RDD.scala:286) at org.apache.spark.rdd.RDD。的foreach(RDD.scala:868) 产生的原因:java.io.NotSerializableException:java.io.PrintWriter中 序列化堆栈:

所有行23我需要写在输出文件中的一行。

代码来源:

package com.mycode.logs; 

import org.apache.hadoop.conf.Configuration 
import org.apache.hadoop.fs._ 
import org.apache.spark.SparkContext._ 
import org.apache.spark._ 
import org.apache.spark.deploy.SparkHadoopUtil 
import org.apache.spark.sql._ 
import org.apache.spark.sql.hive.HiveContext 
import scala.io._ 
import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.FileSystem; 
import org.apache.hadoop.fs.Path; 
import java.io.PrintWriter; 

/** 
* @author RondenaR 
* 
*/ 
object NormalizeMSLogs{ 

    def main(args: Array[String]){ 
    processMsLogs("/user/temporary/*file*") 
    } 

    def processMsLogs(path: String){ 
    System.out.println("INFO: ****************** started ******************") 

    // **** SetMaster is Local only to test ***** 
    // Set context 
    val sparkConf = new SparkConf().setAppName("tmp-logs").setMaster("local") 
    val sc = new SparkContext(sparkConf) 
    val sqlContext = new SQLContext(sc) 
    val hiveContext = new HiveContext(sc) 

    // Set HDFS 
    System.setProperty("HADOOP_USER_NAME", "hdfs") 
    val hdfsconf = SparkHadoopUtil.get.newConfiguration(sc.getConf) 
    hdfsconf.set("fs.defaultFS", "hdfs://192.168.248.130:8020") 
    val hdfs = FileSystem.get(hdfsconf) 

    val output = hdfs.create(new Path("hdfs://192.168.248.130:8020/tmp/mySample.txt")) 
    val writer = new PrintWriter(output) 

    val sourcePath = new Path(path) 
    var count :Int = 0 
    var lineF :String = "" 

    hdfs.globStatus(sourcePath).foreach{ fileStatus => 
     val filePathName = fileStatus.getPath().toString() 
     val fileName = fileStatus.getPath().getName() 

     val hdfsfileIn = sc.textFile(filePathName) 
     val msNode = fileName.substring(1, fileName.indexOf("es")) 

     System.out.println("filePathName: " + filePathName) 
     System.out.println("fileName: " + fileName) 
     System.out.println("hdfsfileIn: " + filePathName) 
     System.out.println("msNode: " + msNode) 

     for(line <- hdfsfileIn){ 
     //System.out.println("line = " + line) 
     count += 1 

     if(count != 23){ 
      lineF = lineF + line + ", " 
     } 

     if(count == 23){ 
      lineF = lineF + line + ", " + msNode 
      System.out.println(lineF) 
      writer.write(lineF) 
      writer.write("\n") 
      count = 0 
      lineF = "" 
     } 
     } // end for loop in file 
    } // end foreach loop 
    writer.close() 
    System.out.println("INFO: ******************ended ******************") 
    sc.stop() 
    } 
} 
+0

您正尝试在分布式块中使用'writer',看起来对我很可疑。我会尝试'map'而不是'foreach',然后你有RDD作为你可以迭代和读/写的结果。无论如何,你可能需要洗牌阶段,IMO无法避免它,HDFS有自己的想法如何分发文件。 –

+0

在对文件进行规范化后,我可以将它输出到列表中,并在完成列表后将它放入HIVE表格中? –

回答

0

不仅是PrintWriter对象writer不能序列:您也可以不把SparkContextsc)在foreach内:它是一个结构只为驾驶者和不通过电线传送给工人是有道理的。

您应该花些时间考虑通过电线发送什么类型的对象。任何指针/流/句柄都没有意义。结构,字符串,原语:这些是有意义的包含在Closure(或广播)中。