2016-12-09 51 views
-1

我有这种格式复利在星火

Date  | Return 
01/01/2015  0.0 
02/02/2015  -0.02 
03/02/2015  0.05 
04/02/2015  0.07 

我想这样做配料,并添加将返回复合回报率一栏的数据帧。复合回报计算公式如下:

  • 1表示第一行。

  • (1 +回报(I))*复利(I-1))

所以我的DF最终将成为Java的

Date   | Return | Compounded 
01/01/2015   0.0   1.0 
02/02/2015  -0.02  1.0*(1-0.2)=0.8 
03/02/2015   0.05  0.8*(1+0.05)=0.84 
04/02/2015   0.07  0.84*(1+0.07)=0.8988 

答案将得到高度赞赏。

+0

你的样品是OK?在第二行中,Compounded的值不应该是0.98? –

+0

是。它应该是0.98。对不起,错误 –

回答

-1

您也可以创建自定义聚合函数并使用它在一个窗口函数中。

像这样(写自由所以有可能会出现一些错误):

package com.myuadfs 

import org.apache.spark.sql.Row 
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} 
import org.apache.spark.sql.types._ 

class MyUDAF() extends UserDefinedAggregateFunction { 

    def inputSchema: StructType = StructType(Array(StructField("Return", DoubleType))) 

    def bufferSchema = StructType(StructField("compounded", DoubleType)) 

    def dataType: DataType = DoubleType 

    def deterministic = true 

    def initialize(buffer: MutableAggregationBuffer) = { 
    buffer(0) = 1.0 // set compounded to 1 
    } 

    def update(buffer: MutableAggregationBuffer, input: Row) = { 
    buffer(0) = buffer.getDouble(0) * (input.getDouble(0) + 1) 
    } 

    // this generally merges two aggregated buffers. This means this 
    // would not have worked properly had you been working with a regular 
    // aggregate but since you are planning to use this inside a window 
    // only this should not be called at all. 
    def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = { 
    buffer1(0) = buffer1.getDouble(0) + buffer2.getDouble(0) 
} 

    def evaluate(buffer: Row) = { 
    buffer.getDouble(0) 
    } 
} 

现在,你可以在一个窗口函数内使用它。事情是这样的:

import org.apache.spark.sql.Window 
val windowSpec = Window.orderBy("date") 
val newDF = df.withColumn("compounded", df("Return").over(windowSpec) 

注意这整个计算应装在单个分区,所以如果你有太大的数据,你会遇到问题的限制。也就是说,名义上这种操作是在按键分区后执行的(例如,将分区添加到窗口中),然后单个元素应该是键的一部分。

+0

非常感谢。这段代码帮了我很多! –

-1

首先,我们定义一个函数f(line)(提出一个更好的名字,请!)来处理线。

def f(line): 
    global firstLine 
    global last_compounded 
    if line[0] == 'Date': 
     firstLine = True 
     return (line[0], line[1], 'Compounded') 
    else: 
     firstLine = False 
    if firstLine: 
     last_compounded = 1 
     firstLine = False 
    else: 
     last_compounded = (1+float(line[1]))*last_compounded 
    return (line[0], line[1], last_compounded) 

使用两个全局变量(可以改进?),我们保持混配(I-1)值,如果我们正在处理的第一道防线。

随着SOME_FILE数据,一个解决方案可以是:

rdd = sc.textFile('some_file').map(lambda l: l.split()) 
r1 = rdd.map(lambda l: f(l)) 

rdd.collect()
[[u'Date 'u'Return'],[u'01/01/2015' ,u'0.0 '],[u'02/02/2015',U'0.02 '],[u'03/02/2015',u'0.05' ],[u'04/02/2015 'u'0.07 ']]
r1.collect()
[(u'Date',u'Return', '复配'),(u'01/01/2015' ,u'0.0' ,1.0),(U '02/02/2015' ,U'0.02' ,0.98),(U '03/02/2015',u'0.05',1.05),(u'04/02/2015',u'0.07',1.1235000000000002)]

+1

当你有多个(本地)执行器时,你确定这可以工作吗?全球将在每个执行者不同... –

+0

@AssafMendelson,好点!回到计划表! –

+0

我试着实现它,但是当我有很多天的时候它失败了。也许是因为Assaf的建议。 –