2016-12-16 49 views
1

我想读取Scala中的一个输入文件,我知道该结构,但是我只需要每隔9个条目。到目前为止,我已经成功地读取使用整个事情:Scala只读取文件的某些部分

val lines = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0)) 
val fields = lines.map(line => line.split(",")) 

的问题,这给我留下了一个数组,它是巨大(我们谈论的数据20GB)。我不仅看到自己为了在RDD [数组[String]]和数组[String]之间进行转换而被迫编写一些非常难看的代码,但它实质上使我的代码无用。

我试着用

.map() 
.flatMap() and 
.reduceByKey() 

但是没什么居然把我收集的“细胞”,到我需要他们的格式之间的不同的方法和混音。

下面是应该发生的:从我们的服务器读取的文本文件的文件夹,该代码应阅读每个格式文本的“线”:

*---------* 
| NASDAQ: | 
*---------* 
exchange, stock_symbol, date, stock_price_open, stock_price_high, stock_price_low, stock_price_close, stock_volume, stock_price_adj_close 

,并只保留STOCK_SYMBOL的保持因为这是我正在计算的标识符。到目前为止,我的尝试是将整个事物转换为一个数组,只将第一个索引从第一个索引中收集到collect_cells var。问题是,根据我的计算和实际生活结果,代码需要335天才能运行(不是玩笑)。

这里是我当前的代码以供参考:

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 

object SparkNum { 


    def main(args: Array[String]) { 

    // Do some Scala voodoo 
    val sc = new SparkContext(new SparkConf().setAppName("Spark Numerical")) 

    // Set input file as per HDFS structure + input args 
    val lines = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0)) 
    val fields = lines.map(line => line.split(",")) 
    var collected_cells:Array[String] = new Array[String](0) 

    //println("[MESSAGE] Length of CC: " + collected_cells.length) 

    val divider:Long = 9 
    val array_length = fields.count/divider 
    val casted_length = array_length.toInt 

    val indexedFields = fields.zipWithIndex 
    val indexKey = indexedFields.map{case (k,v) => (v,k)} 

    println("[MESSAGE] Number of lines: " + array_length) 
    println("[MESSAGE] Casted lenght of: " + casted_length) 



    for(i <- 1 to casted_length) { 

     println("[URGENT DEBUG] Processin line " + i + " of " + casted_length) 

     var index = 9 * i - 8 

     println("[URGENT DEBUG] Index defined to be " + index) 

     collected_cells :+ indexKey.lookup(index) 

    } 



    println("[MESSAGE] collected_cells size: " + collected_cells.length) 



    val single_cells = collected_cells.flatMap(collected_cells => collected_cells); 
    val counted_cells = single_cells.map(cell => (cell, 1).reduceByKey{case (x, y) => x + y}) 
    // val result = counted_cells.reduceByKey((a,b) => (a+b)) 

    // val inmem = counted_cells.persist() 
    // 
    // // Collect driver into file to be put into user archive 
    // inmem.saveAsTextFile("path to server location") 

    // ==> Not necessary to save the result as processing time is recorded, not output 


    } 

} 

的底部,目前注释掉,因为我试图调试它,但它作为伪代码,我知道我需要做。我可能想指出,我与斯卡拉完全不熟悉,因此诸如_符号之类的东西混淆了我的生活。

谢谢你的时间。

+0

预处理外星火文件只保留每9号线。对于那部分使用Spark只会让生活变得困难。 –

+0

“纳斯达克”横幅是文件中的头文件吗? – maasg

+0

让我明白:您想要以CSV格式读取包含股票报价的文件(或多个文件),并且您想要提取它包含的(唯一的?)'stock_symbol'的数量? – maasg

回答

2

有一些需要澄清的问题概念:

当我们执行这段代码:

val lines = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0)) 
val fields = lines.map(line => line.split(",")) 

这并不造成巨大的数据大小的数组。该表达式代表基础数据的转换。它可以进一步转化,直到我们将数据减少到我们期望的信息集。

在这种情况下,我们希望有一个记录的stock_symbol场编码的CSV:

exchange, stock_symbol, date, stock_price_open, stock_price_high, stock_price_low, stock_price_close, stock_volume, stock_price_adj_close 

我也要去假设数据文件包含这样的横幅:

*---------* 
| NASDAQ: | 
*---------* 

我们要做的第一件事就是删除看起来像这个横幅的东西。实际上,我将假设第一个字段是以字母数字字符开头的证券交易所的名称。我们将做到这一点,我们做任何分裂之前,导致:

val lines = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0)) 
val validLines = lines.filter(line => !line.isEmpty && line.head.isLetter) 
val fields = validLines.map(line => line.split(",")) 

它有助于写类型的变量,有胸怀,我们有我们所期望的数据类型的和平。随着我们Scala技能的进步,可能变得不那么重要。让我们改写以上类型的表达式:

val lines: RDD[String] = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0)) 
val validLines: RDD[String] = lines.filter(line => !line.isEmpty && line.head.isLetter) 
val fields: RDD[Array[String]] = validLines.map(line => line.split(",")) 

我们感兴趣的是stock_symbol领域,这位置上是元素#1基于0阵列:

val stockSymbols:RDD[String] = fields.map(record => record(1)) 

如果我们要算符号,所有剩下的就是发出计数:

val totalSymbolCount = stockSymbols.count() 

因为我们有所有记录一个条目这不是非常有帮助。稍微有趣的问题是:

我们有多少个不同的股票代码?

val uniqueStockSymbols = stockSymbols.distinct.count() 

我们每个符号有多少条记录?

val countBySymbol = stockSymbols.map(s => (s,1)).reduceByKey(_+_) 

在星火2.0,为Dataframes和数据集CSV支持可出鉴于盒 的,我们的数据不具有与字段名称的标题行(什么大的数据集通常的),我们将需要提供的列名:

val stockDF = sparkSession.read.csv("/tmp/quotes_clean.csv").toDF("exchange", "symbol", "date", "open", "close", "volume", "price") 

现在我们可以回答我们的问题很简单:

val uniqueSymbols = stockDF.select("symbol").distinct().count 
val recordsPerSymbol = stockDF.groupBy($"symbol").agg(count($"symbol")) 
+0

嗨maasg,非常感谢您的回答!我在适用的地方更改了我的代码,但是当我在.reduceByKey()中使用_时,Maven抱怨“扩展函数缺少参数类型”。现在我会尝试创建一个新行,或者我会尝试使用旧的布局.reduceByKey((a,b)=>(a + b))。干杯。 – Synaxr

+1

@Synaxr查看Spark Notebook中的代码:https://gist.github.com/maasg/7b8a4991ba9e2c236ddd8dfd823352cc – maasg