例如,我们在过去3年有一个拥有2000个股票代码收盘价的镶木地板文件,我们想要计算每个符号的5日移动平均线。如何在scala中并行化spark中的for循环?
于是我产生火花SQLContext然后
val marketData = sqlcontext.sql("select DATE, SYMBOL, PRICE from stockdata order by DATE").cache()
要得到符号列表,
val symbols = marketData.select("SYMBOL").distinct().collect()
,这里是for循环:
for (symbol <- symbols) {
marketData.filter(symbol).rdd.sliding(5).map(...calculating the avg...).save()
}
显然,这样做的for循环上的火花很慢,并且每个小结果的save()
也会减慢这个过程(我尝试过在for循环之外定义一个var result
,并将所有输出合并为一个IO操作,但我得到了一个stackoverflow异常),那么如何并行化for循环并优化IO操作呢?
感谢您的回答。然而,'marketdata'包含所有的市场数据(2000个交易日×900天= 1800000行),如果我们在这个没有过滤器(符号)的rdd上滑动(5)'似乎会得到移动平均的错误结果?我是否清楚自己? –
对,我的错。看到我编辑的答案? –
感谢您的耐心。据我所知,如果我们在'map(symbolize)'的rdd上有'{row =>(row.getAs [String](“SYMBOL”),row)}和'reduceByKey'这样的象征符号, 'return我们必须'reduceByKey {case(row_x,row_y)=> ...}'而不是'reduceByKey {case(symbol,days)=> ...}',最后我'groupByKey()'开启'map(symbolize)'返回的rdd和'mapValues(x => x.sliding(5).map(makeAvg))。save()'并且工作正常。再次感谢你的帮助! –