2017-10-09 81 views
2

我有复杂的逻辑实现,尝试了一段时间,但仍然没有线索,请帮助检查它是否切实可行以及如何执行。非常感谢你!!根据当前行和前一行的列创建一个计算列

我有以下SparkSQL数据框(datetime正在增加,“类型”被重复,每节(不同类型的)总是与'flag'=1开始):

+---------+-----+----+-----+ 
|datetime |type |flag|value| 
+---------+-----+----+-----+ 
|20170901 |A |1 | 560| 
|20170902 |A |0 | 3456| 
|20170903 |A |0 | 50| 
|20170904 |A |0 | 789| 
...... 
|20170912 |B |1 | 345| 
|20170913 |B |0 | 4510| 
|20170915 |B |0 | 508| 
...... 
|20170919 |C |1 | 45| 
|20170923 |C |0 | 410| 
|20170925 |C |0 | 108| 
...... 
|20171001 |A |1 | 198| 
|20171002 |A |0 | 600| 
|20171005 |A |0 | 675| 
|20171008 |A |0 | 987| 
...... 

我需要根据以前创建计算列行和当前行,能有这样数据帧(计算场SEQ指增加部分序列):

+---------+-----+----+-----+-----+ 
|datetime |type |flag|value| Seq| 
+---------+-----+----+-----+-----+ 
|20170901 |A |1 | 560| 1| 
|20170902 |A |0 | 3456| 1| 
|20170903 |A |0 | 50| 1| 
|20170904 |A |0 | 789| 1| 
...... 
|20170912 |B |1 | 345| 2| 
|20170913 |B |0 | 4510| 2| 
|20170915 |B |0 | 508| 2| 
...... 
|20170919 |C |1 | 45| 3| 
|20170923 |C |0 | 410| 3| 
|20170925 |C |0 | 108| 3| 
...... 
|20171001 |A |1 | 198| 4| 
|20171002 |A |0 | 600| 4| 
|20171005 |A |0 | 675| 4| 
|20171008 |A |0 | 987| 4| 
...... 

任何线索赞赏。 我写的代码(感谢https://stackoverflow.com/users/1592191/mrsrinivas):

from pyspark.sql import SQLContext, Row 
from pyspark import SparkConf, SparkContext 
from pyspark.sql.session import SparkSession 
from pyspark.sql.window import Window 
import pyspark.sql.functions as func 
import sys 

conf = SparkConf().setMaster("local[2]") 
conf = conf.setAppName("test") 
sc = SparkContext.getOrCreate(conf=conf) 
spark = SparkSession(sc) 
rdd = sc.parallelize([(20170901,"A",1,560), (20170902,"A",0,3560), (20170903,"A",0,50), (20170904,"A",0,56), 
         (20170912,"B",1,345), (20170913,"B",0,4510), (20170915,"B",0,453), 
         (20170919,"C",1,55), (20170923,"C",0,410), (20170925,"C",0,108), 
         (20171001,"A",1,189), (20171002,"A",0,600), (20171005,"A",0,650), (20171008,"A",0,956)]) 
df = spark.createDataFrame(rdd, ["datatime", "type", "flag", "value"]) 
df.show() 

windowSpec = Window.partitionBy(df['type']).orderBy(df['flag'].desc()).rangeBetween(-sys.maxsize, sys.maxsize) 
df.withColumn('Seq', func.dense_rank().over(windowSpec)) 
df.show() 

但遇到错误:Py4JJavaError:同时呼吁o514.withColumn发生错误。 :org.apache.spark.sql.AnalysisException:窗口框架无界前置和无界下面之间的范围必须匹配无界前置和当前行之间所需的框架; 有什么想法?

回答

0

希望这会有所帮助!

from pyspark.sql.window import Window 
from pyspark.sql.functions import col, monotonically_increasing_id, when, last 
import sys 

#sample data 
df = sc.parallelize([(20170901,"A",1,560), (20170902,"A",0,3560), (20170903,"A",0,50), (20170904,"A",0,56), 
        (20170912,"B",1,345), (20170913,"B",0,4510), (20170915,"B",0,453), 
        (20170919,"C",1,55), (20170923,"C",0,410), (20170925,"C",0,108), 
        (20171001,"A",1,189), (20171002,"A",0,600), (20171005,"A",0,650), (20171008,"A",0,956)]).\ 
    toDF(["datetime", "type", "flag", "value"]) 

df = df.withColumn("row_id",monotonically_increasing_id()) 
w = Window.partitionBy(col("type")).orderBy(col('datetime')) 
df1 = df.withColumn("seq_temp", when(col('flag')==1, col('row_id')).otherwise(None)) 
df1 = df1.withColumn("seq", last('seq_temp', True).over(w.rowsBetween(-sys.maxsize, 0))).\ 
    drop('row_id','seq_temp').\ 
    sort('Seq') 
df1.show() 

输出是:

+--------+----+----+-----+----------+ 
|datetime|type|flag|value|  seq| 
+--------+----+----+-----+----------+ 
|20170901| A| 1| 560|   0| 
|20170902| A| 0| 3560|   0| 
|20170903| A| 0| 50|   0| 
|20170904| A| 0| 56|   0| 
|20170913| B| 0| 4510|   4| 
|20170912| B| 1| 345|   4| 
|20170915| B| 0| 453|   4| 
|20170919| C| 1| 55|8589934592| 
|20170925| C| 0| 108|8589934592| 
|20170923| C| 0| 410|8589934592| 
|20171001| A| 1| 189|8589934595| 
|20171008| A| 0| 956|8589934595| 
|20171005| A| 0| 650|8589934595| 
|20171002| A| 0| 600|8589934595| 
+--------+----+----+-----+----------+ 

seq值是不完美的序列,但单调递增。

+1

非常感谢你普雷姆!它运作良好,无论seq号码有多大。 – peter

+0

@peter很高兴它帮助:)也许你应该[接受它作为正确的答案](https://stackoverflow.com/help/someone-answers),以便该问题被标记为关闭。 – Prem

+0

当然Prem。公认! – peter

0

您可以使用下面的代码,我已经修改了它有“A”的两倍

from pyspark.sql.window import Window 
    from pyspark.sql.functions import col, monotonically_increasing_id, when, last 
    from pyspark.sql.functions import lit 
    import sys 
    import pyspark.sql.functions as func 

    #sample data 
    df = sc.parallelize([(20170901,"A",1,560), (20170902,"A",0,3560), (20170903,"A",0,50), (20170904,"A",0,56), 
         (20170912,"B",1,345), (20170913,"B",0,4510), (20170915,"B",0,453), 
         (20170919,"C",1,55), (20170923,"C",0,410), (20170925,"C",0,108), 
         (20171001,"A",1,189), (20171002,"A",0,600), (20171005,"A",0,650), (20171008,"A",0,956)]).\ 
     toDF(["datetime", "type", "flag", "value"]) 

    df = df.withColumn("row_id",monotonically_increasing_id()) 
    w = Window.partitionBy(col("type")).orderBy(col('datetime')) 
    df1 = df.withColumn("seq_temp", when(col('flag')==1, col('row_id')).otherwise(None)) 
    df1 = df1.withColumn("seq", last('seq_temp', True).over(w.rowsBetween(-sys.maxsize, 0))).sort('Seq') 
    r = df1.withColumn('seq_item',lit(0)) 
    windowSpec = Window.partitionBy(r['seq_item']).orderBy(r['seq']) 
    s = r.withColumn('seq_1',func.dense_rank().over(windowSpec)).drop('seq_temp','seq','seq_item','row_id') 
    s.show() 

+--------+----+----+-----+--------+-----+ 
|datatime|type|flag|value|seq_item|seq_1| 
+--------+----+----+-----+--------+-----+ 
|20170901| A| 1| 560|  0| 1| 
|20170902| A| 0| 3560|  0| 1| 
|20170903| A| 0| 50|  0| 1| 
|20170904| A| 0| 56|  0| 1| 
|20170912| B| 1| 345|  0| 2| 
|20170913| B| 0| 4510|  0| 2| 
|20170915| B| 0| 453|  0| 2| 
|20170919| C| 1| 55|  0| 3| 
|20170923| C| 0| 410|  0| 3| 
|20170925| C| 0| 108|  0| 3| 
|20171001| A| 1| 189|  0| 4| 
|20171002| A| 0| 600|  0| 4| 
|20171005| A| 0| 650|  0| 4| 
|20171008| A| 0| 956|  0| 4| 
+--------+----+----+-----+--------+-----+ 
+0

谢谢。但是第四部分('类型'栏)是'A'而不是'D',这使得它变得复杂。 – peter

相关问题