2

我在pyspark上使用SparkSQL将一些PostgreSQL表存储到DataFrame中,然后构建一个查询,该查询根据date类型的startstop列生成多个时间序列。pyspark上的SparkSQL:如何生成时间序列?

假设my_table包含:

start  | stop  
------------------------- 
2000-01-01 | 2000-01-05 
2012-03-20 | 2012-03-23 

在PostgreSQL它很容易做到这一点:

SELECT generate_series(start, stop, '1 day'::interval)::date AS dt FROM my_table 

,它会生成此表:

dt 
------------ 
2000-01-01 
2000-01-02 
2000-01-03 
2000-01-04 
2000-01-05 
2012-03-20 
2012-03-21 
2012-03-22 
2012-03-23 

,但怎么办那使用普通的SparkSQL?是否有必要使用UDF或一些DataFrame方法?

回答

0

假设你有一个从火花SQL数据框中df,试试这个

from pyspark.sql.functions as F 
from pyspark.sql.types as T 

def timeseriesDF(start, total): 
    series = [start] 
    for i xrange(total-1): 
     series.append(
      F.date_add(series[-1], 1) 
     ) 
    return series 

df.withColumn("t_series", F.udf(
       timeseriesDF, 
       T.ArrayType() 
      ) (df.start, F.datediff(df.start, df.stop)) 
    ).select(F.explode("t_series")).show() 
+0

谢谢你,拉克什。我提出了一个不太详细的解决方案,与您的想法保持一致。我希望它的SparkSQL语法尽可能少用Python代码。我会接受你的答案,但看看我的解决方案。 – pietrop

1

@Rakesh答案是正确的,但我想和大家分享一个更简洁的解决方案:

import datetime 
import pyspark.sql.types 
from pyspark.sql.functions import UserDefinedFunction 

# UDF 
def generate_date_series(start, stop): 
    return [start + datetime.timedelta(days=x) for x in range(0, (stop-start).days + 1)]  

# Register UDF for later usage 
spark.udf.register("generate_date_series", generate_date_series, ArrayType(DateType())) 

# mydf is a DataFrame with columns `start` and `stop` of type DateType() 
mydf.createOrReplaceTempView("mydf") 

spark.sql("SELECT explode(generate_date_series(start, stop)) FROM mydf").show()