2014-05-12 69 views
1

我是Spark & Scala新手。从Spark RDD读取Kryo文件

我需要阅读和分析星火,它写在KRYO我的Scala代码文件系列化:

import com.esotericsoftware.kryo.Kryo 
import com.esotericsoftware.kryo.io.Output 

val kryo:Kryo = new Kryo() 
val output:Output = new Output(new FileOutputStream("filename.ext",true)) 

//kryo.writeObject(output, feed) (tested both line) 
kryo.writeClassAndObject(output, myScalaObject) 

这是创造我的对象(myScalaObject)的文件序列化的伪码这是一个复杂的对象。

文件似乎写的很好,但我有问题,当我在星火RDD阅读

伪代码星火:

val conf = new SparkConf() 
    .setMaster("local") 
    .setAppName("My application") 
    .set("spark.executor.memory", "1g") 


conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
conf.set("spark.kryo.registrator", "myScalaObject") 

val sc = new SparkContext(conf) 

val file=sc.objectFile[myScalaObject]("filename.ext") 

val counts = file.count() 

当我试图执行它,我收到此错误:

org.apache.spark.SparkException:作业中止:任务0.0:0失败1次(最近的失败:异常故障:java.io.IOException的:文件:FILENAME.EXT不是SequenceFile)

是否可以在Spark中读取这种类型的文件?

如果这种解决方案是不可能的,那么创建一个复杂的文件结构来读取Spark的好方法是什么?

谢谢

+1

'objectFile'用于加载保存为含有序列化对象一个SequenceFile一个'RDD'。为什么不使用Kryo读取对象并使用'parallel'来生成'RDD'? – zsxwing

+0

@zsxwing谢谢你,很好主意,我试了一下。但我有很多小的(5-20​​mb),并不想并行化文件的内容。有没有什么方法可以并行化文件名,然后每个服务器读取它的文件? – faster2b

+1

用文件名创建一个RDD并用'map'读取内容? – zsxwing

回答

2

如果你想与objectFile阅读,与saveAsObjectFile写出来的数据。

val myObjects: Seq[MyObject] = ... 
val rddToSave = sc.parallelize(myObjects) // Or better yet: construct as RDD from the start. 
rddToSave.saveAsObjectFile("/tmp/x") 
val rddLoaded = sc.objectFile[MyObject]("/tmp/x") 

另外,作为zsxwing说,你可以创建一个RDD文件名,并使用map读取每个的内容。如果希望每个文件被读入一个单独的分区,并行化的文件名到单独的分区:

def loadFiles(filenames: Seq[String]): RDD[Object] = { 
    def load(filename: String): Object = { 
    val input = new Input(new FileInputStream(filename)) 
    return kryo.readClassAndObject(input) 
    } 
    val partitions = filenames.length 
    return sc.parallelize(filenames, partitions).map(load) 
} 
+1

你现在也可以使用'sc.wholeTextFiles'。我必须在某个时候更新答案。 –