2017-06-06 46 views
1

我有以下结构快行间计算

groupId | time | value 
1   0  2 
1   1  1 
1   2  4 
2   0  6 
2   1  2 

的CSV文件(> 3GB),并希望增加一列(值t-1),包含值 - 在同一组中 - 的“行”早时间步骤的:

groupId | time | value | value t-1 
1   0  2   - 
1   1  1   2 
1   2  4   1 
2   0  6   - 
2   1  2   6 

我想象的昂贵部分是要搜索的前一行。不知何故,它似​​乎是一个没有减少地图减少的工作 - 如果这是有道理的。但据我所知,我不能确定同一工作人员拥有同一组的所有数据。

Spark是否是正确的工具?

我的最佳替代的解决方案是分割文件分成多个文件(每组一个),只是运行排序并依次Python脚本的多个实例增加了值t-1值

+0

你要做到这几个文件?如果你只需要修复一个文件,那么做一个简单的for循环,然后等待(可能很多)来解析你的3GB数据 – JBernardo

+0

@JBernardo现在我只需要做一次,但是有可能会变成这样更常用的用例。这个循环是在1到2天的球场中的某个地方 - 只是看到只有一个繁忙的核心而感到伤心...... – bam

回答

2

这可以是通过使用如下所示的火花窗口功能来实现。

import org.apache.spark.sql.expressions.Window 

val df = Seq((1,0,2), (1,1,1), (1,2,4), (2,0,6), (2,1,2)).toDF("groupId", "time", "value") 

val result = df.withColumn("value_t-1", sum($"value").over(Window.partitionBy("groupId").orderBy("time").rowsBetween(-1,-1))) 

输出:

scala> result.show() 
+-------+----+-----+---------+ 
|groupId|time|value|value_t-1| 
+-------+----+-----+---------+ 
|  1| 0| 2|  null| 
|  1| 1| 1|  2| 
|  1| 2| 4|  1| 
|  2| 0| 6|  null| 
|  2| 1| 2|  6| 
+-------+----+-----+---------+ 

Python版本

>>> from pyspark.sql.window import Window 
>>> import pyspark.sql.functions as func 
>>> df = spark.createDataFrame([(1,0,2), (1,1,1), (1,2,4), (2,0,6), (2,1,2)], ["groupId", "time", "value"]) 
>>> result = df.withColumn("value_t-1", func.sum(df.value).over(Window.partitionBy(df.groupId).orderBy(df.time).rowsBetween(-1,-1))) 
>>> result.show() 
+-------+----+-----+---------+ 
|groupId|time|value|value_t-1| 
+-------+----+-----+---------+ 
|  1| 0| 2|  null| 
|  1| 1| 1|  2| 
|  1| 2| 4|  1| 
|  2| 0| 6|  null| 
|  2| 1| 2|  6| 
+-------+----+-----+---------+ 
+0

谢谢!在一台机器上(8个内核)在不到25分钟的时间内处理完整个文件(9个这样的列) – bam