2016-05-03 25 views
2

例如,我们在过去3年有一个拥有2000个股票代码收盘价的镶木地板文件,我们想要计算每个符号的5日移动平均线。如何在scala中并行化spark中的for循环?

于是我产生火花SQLContext然后

val marketData = sqlcontext.sql("select DATE, SYMBOL, PRICE from stockdata order by DATE").cache() 

要得到符号列表,

val symbols = marketData.select("SYMBOL").distinct().collect() 

,这里是for循环:

for (symbol <- symbols) { 
    marketData.filter(symbol).rdd.sliding(5).map(...calculating the avg...).save() 
} 

显然,这样做的for循环上的火花很慢,并且每个小结果的save()也会减慢这个过程(我尝试过在for循环之外定义一个var result,并将所有输出合并为一个IO操作,但我得到了一个stackoverflow异常),那么如何并行化for循环并优化IO操作呢?

回答

2

您编写的程序在驱动程序(“主”)中运行的spark节点。如果您在并行结构(RDD)上运行,则此程序中的表达式只能并行化。

试试这个:

marketdata.rdd.map(symbolize).reduceByKey{ case (symbol, days) => days.sliding(5).map(makeAvg) }.foreach{ case (symbol,averages) => averages.save() } 

其中symbolize需要符号x天的行并返回一个元组(符号,日)。

+0

感谢您的回答。然而,'marketdata'包含所有的市场数据(2000个交易日×900天= 1800000行),如果我们在这个没有过滤器(符号)的rdd上滑动(5)'似乎会得到移动平均的错误结果?我是否清楚自己? –

+0

对,我的错。看到我编辑的答案? –

+0

感谢您的耐心。据我所知,如果我们在'map(symbolize)'的rdd上有'{row =>(row.getAs [String](“SYMBOL”),row)}和'reduceByKey'这样的象征符号, 'return我们必须'reduceByKey {case(row_x,row_y)=> ...}'而不是'reduceByKey {case(symbol,days)=> ...}',最后我'groupByKey()'开启'map(symbolize)'返回的rdd和'mapValues(x => x.sliding(5).map(makeAvg))。save()'并且工作正常。再次感谢你的帮助! –

2

对于答案的第一部分,我不同意卡洛斯。该程序不在驱动程序(“主”)中运行。

回路不运行顺序,但对于每个符号的执行:

marketData.filter(symbol).rdd.sliding(5).map(...calculating the avg...).save() 

并联因为markedData完成是一个Spark数据帧,它是分布式的。