我有复杂的逻辑实现,尝试了一段时间,但仍然没有线索,请帮助检查它是否切实可行以及如何执行。非常感谢你!!根据当前行和前一行的列创建一个计算列
我有以下SparkSQL
数据框(datetime
正在增加,“类型”被重复,每节(不同类型的)总是与'flag'=1
开始):
+---------+-----+----+-----+
|datetime |type |flag|value|
+---------+-----+----+-----+
|20170901 |A |1 | 560|
|20170902 |A |0 | 3456|
|20170903 |A |0 | 50|
|20170904 |A |0 | 789|
......
|20170912 |B |1 | 345|
|20170913 |B |0 | 4510|
|20170915 |B |0 | 508|
......
|20170919 |C |1 | 45|
|20170923 |C |0 | 410|
|20170925 |C |0 | 108|
......
|20171001 |A |1 | 198|
|20171002 |A |0 | 600|
|20171005 |A |0 | 675|
|20171008 |A |0 | 987|
......
我需要根据以前创建计算列行和当前行,能有这样数据帧(计算场SEQ指增加部分序列):
+---------+-----+----+-----+-----+
|datetime |type |flag|value| Seq|
+---------+-----+----+-----+-----+
|20170901 |A |1 | 560| 1|
|20170902 |A |0 | 3456| 1|
|20170903 |A |0 | 50| 1|
|20170904 |A |0 | 789| 1|
......
|20170912 |B |1 | 345| 2|
|20170913 |B |0 | 4510| 2|
|20170915 |B |0 | 508| 2|
......
|20170919 |C |1 | 45| 3|
|20170923 |C |0 | 410| 3|
|20170925 |C |0 | 108| 3|
......
|20171001 |A |1 | 198| 4|
|20171002 |A |0 | 600| 4|
|20171005 |A |0 | 675| 4|
|20171008 |A |0 | 987| 4|
......
任何线索赞赏。 我写的代码(感谢https://stackoverflow.com/users/1592191/mrsrinivas):
from pyspark.sql import SQLContext, Row
from pyspark import SparkConf, SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as func
import sys
conf = SparkConf().setMaster("local[2]")
conf = conf.setAppName("test")
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)
rdd = sc.parallelize([(20170901,"A",1,560), (20170902,"A",0,3560), (20170903,"A",0,50), (20170904,"A",0,56),
(20170912,"B",1,345), (20170913,"B",0,4510), (20170915,"B",0,453),
(20170919,"C",1,55), (20170923,"C",0,410), (20170925,"C",0,108),
(20171001,"A",1,189), (20171002,"A",0,600), (20171005,"A",0,650), (20171008,"A",0,956)])
df = spark.createDataFrame(rdd, ["datatime", "type", "flag", "value"])
df.show()
windowSpec = Window.partitionBy(df['type']).orderBy(df['flag'].desc()).rangeBetween(-sys.maxsize, sys.maxsize)
df.withColumn('Seq', func.dense_rank().over(windowSpec))
df.show()
但遇到错误:Py4JJavaError:同时呼吁o514.withColumn发生错误。 :org.apache.spark.sql.AnalysisException:窗口框架无界前置和无界下面之间的范围必须匹配无界前置和当前行之间所需的框架; 有什么想法?
非常感谢你普雷姆!它运作良好,无论seq号码有多大。 – peter
@peter很高兴它帮助:)也许你应该[接受它作为正确的答案](https://stackoverflow.com/help/someone-answers),以便该问题被标记为关闭。 – Prem
当然Prem。公认! – peter