2016-08-04 39 views
0

我有一个固定长度的文件(示例如下所示),我想使用SCALA(不是python或java)在Spark中使用DataFrames API读取此文件。 。 使用DataFrame API可以读取textFile,json文件等,但不知道是否有读取固定长度文件的方法。我在网上寻找这个,发现github link,但我为此目的下载spark-fixedwidth-assembly-1.0.jar,但我无法找到任何地方的jar。我完全迷失在这里,需要你的建议和帮助。在Stackoverflow中有几篇文章,但它们与Scala和DataFrame API无关。如何在Spark中使用DataFrame API和SCALA读取固定长度的文件

这里是文件

56 apple  TRUE 0.56 
45 pear  FALSE1.34 
34 raspberry TRUE 2.43 
34 plum  TRUE 1.31 
53 cherry TRUE 1.4 
23 orange FALSE2.34 
56 persimmon FALSE23.2 

每列的固定宽度是3,10,5,4

请提出你的意见。

回答

2

那么......使用子字符串来断行。然后修剪以删除wheitespaces。然后做你想做的。

case class DataUnit(s1: Int, s2: String, s3:Boolean, s4:Double) 

sc.textFile('your_file_path') 
    .map(l => (l.substring(0, 3).trim(), l.substring(3, 13).trim(), l.substring(13,18).trim(), l.substring(18,22).trim())) 
    .map({ case (e1, e2, e3, e4) => DataUnit(e1.toInt, e2, e3.toBoolean, e4.toDouble) }) 
    .toDF 
+0

我试图在REPL但我得到的错误。你可以提一下在REPL中锻炼吗? –

+0

':32:错误:错误的参数数量;预期= 1 val mapRDD = file.map(l =>(l.substring(0,4).trim(),l.substring(4,14).trim(),l.substring(14,19)。 trim(),l.substring(19,23).trim()))。map((e1,e2,e3,e4)=> DataUnit(e1.toInt,e2,e3.toBoolean,e4.toDouble))。 toDF ^ ' –

+0

现在应该修复。尝试在REPL中逐步运行每个映射。 –

1

固定长度格式很旧,我找不到这个格式的好的Scala库...所以我创建了我自己的。

您可以点击此处查看:https://github.com/atais/Fixed-Length

星火用法很简单,你会得到你的对象DataSet

首先,您需要创建对象的说明,FE:

case class Employee(name: String, number: Option[Int], manager: Boolean) 

object Employee { 

    import com.github.atais.util.Read._ 
    import cats.implicits._ 
    import com.github.atais.util.Write._ 
    import Codec._ 

    implicit val employeeCodec: Codec[Employee] = { 
     fixed[String](0, 10) <<: 
     fixed[Option[Int]](10, 13, Alignment.Right) <<: 
     fixed[Boolean](13, 18) 
    }.as[Employee] 
} 

后来只使用解析器:

val input = sql.sparkContext.textFile(file) 
       .filter(_.trim.nonEmpty) 
       .map(Parser.decode[Employee]) 
       .flatMap { 
        case Right(x) => Some(x) 
        case Left(e) => 
         System.err.println(s"Failed to process file $file, error: $e") 
         None 
       } 
sql.createDataset(input) 
相关问题