我想读取Scala中的一个输入文件,我知道该结构,但是我只需要每隔9个条目。到目前为止,我已经成功地读取使用整个事情:Scala只读取文件的某些部分
val lines = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0))
val fields = lines.map(line => line.split(","))
的问题,这给我留下了一个数组,它是巨大(我们谈论的数据20GB)。我不仅看到自己为了在RDD [数组[String]]和数组[String]之间进行转换而被迫编写一些非常难看的代码,但它实质上使我的代码无用。
我试着用
.map()
.flatMap() and
.reduceByKey()
但是没什么居然把我收集的“细胞”,到我需要他们的格式之间的不同的方法和混音。
下面是应该发生的:从我们的服务器读取的文本文件的文件夹,该代码应阅读每个格式文本的“线”:
*---------*
| NASDAQ: |
*---------*
exchange, stock_symbol, date, stock_price_open, stock_price_high, stock_price_low, stock_price_close, stock_volume, stock_price_adj_close
,并只保留STOCK_SYMBOL的保持因为这是我正在计算的标识符。到目前为止,我的尝试是将整个事物转换为一个数组,只将第一个索引从第一个索引中收集到collect_cells var。问题是,根据我的计算和实际生活结果,代码需要335天才能运行(不是玩笑)。
这里是我当前的代码以供参考:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SparkNum {
def main(args: Array[String]) {
// Do some Scala voodoo
val sc = new SparkContext(new SparkConf().setAppName("Spark Numerical"))
// Set input file as per HDFS structure + input args
val lines = sc.textFile("hdfs://moonshot-ha-nameservice/" + args(0))
val fields = lines.map(line => line.split(","))
var collected_cells:Array[String] = new Array[String](0)
//println("[MESSAGE] Length of CC: " + collected_cells.length)
val divider:Long = 9
val array_length = fields.count/divider
val casted_length = array_length.toInt
val indexedFields = fields.zipWithIndex
val indexKey = indexedFields.map{case (k,v) => (v,k)}
println("[MESSAGE] Number of lines: " + array_length)
println("[MESSAGE] Casted lenght of: " + casted_length)
for(i <- 1 to casted_length) {
println("[URGENT DEBUG] Processin line " + i + " of " + casted_length)
var index = 9 * i - 8
println("[URGENT DEBUG] Index defined to be " + index)
collected_cells :+ indexKey.lookup(index)
}
println("[MESSAGE] collected_cells size: " + collected_cells.length)
val single_cells = collected_cells.flatMap(collected_cells => collected_cells);
val counted_cells = single_cells.map(cell => (cell, 1).reduceByKey{case (x, y) => x + y})
// val result = counted_cells.reduceByKey((a,b) => (a+b))
// val inmem = counted_cells.persist()
//
// // Collect driver into file to be put into user archive
// inmem.saveAsTextFile("path to server location")
// ==> Not necessary to save the result as processing time is recorded, not output
}
}
的底部,目前注释掉,因为我试图调试它,但它作为伪代码,我知道我需要做。我可能想指出,我与斯卡拉完全不熟悉,因此诸如_符号之类的东西混淆了我的生活。
谢谢你的时间。
预处理外星火文件只保留每9号线。对于那部分使用Spark只会让生活变得困难。 –
“纳斯达克”横幅是文件中的头文件吗? – maasg
让我明白:您想要以CSV格式读取包含股票报价的文件(或多个文件),并且您想要提取它包含的(唯一的?)'stock_symbol'的数量? – maasg