2016-01-22 119 views
7

我已经加载从一个SQLServer表中的数据帧。它看起来像这样:集团火花数据帧按日期

>>> df.show() 
+--------------------+----------+ 
|   timestamp| Value | 
+--------------------+----------+ 
|2015-12-02 00:10:...|  652.8| 
|2015-12-02 00:20:...|  518.4| 
|2015-12-02 00:30:...|  524.6| 
|2015-12-02 00:40:...|  382.9| 
|2015-12-02 00:50:...|  461.6| 
|2015-12-02 01:00:...|  476.6| 
|2015-12-02 01:10:...|  472.6| 
|2015-12-02 01:20:...|  353.0| 
|2015-12-02 01:30:...|  407.9| 
|2015-12-02 01:40:...|  475.9| 
|2015-12-02 01:50:...|  513.2| 
|2015-12-02 02:00:...|  569.0| 
|2015-12-02 02:10:...|  711.4| 
|2015-12-02 02:20:...|  457.6| 
|2015-12-02 02:30:...|  392.0| 
|2015-12-02 02:40:...|  459.5| 
|2015-12-02 02:50:...|  560.2| 
|2015-12-02 03:00:...|  252.9| 
|2015-12-02 03:10:...|  228.7| 
|2015-12-02 03:20:...|  312.2| 
+--------------------+----------+ 

现在我想组(和和)的小时值(或天或一个月或......),但我真的没有一个关于如何能线索我这样做。

这就是我如何加载数据帧。但我认为这不是正确的做法:

query = """ 
SELECT column1 AS timestamp, column2 AS value 
FROM table 
WHERE blahblah 
""" 

sc = SparkContext("local", 'test') 
sqlctx = SQLContext(sc) 

df = sqlctx.load(source="jdbc", 
       url="jdbc:sqlserver://<CONNECTION_DATA>", 
       dbtable="(%s) AS alias" % query) 

可以吗?

回答

11

由于1.5.0火花提供像dayofmonthhourmonthyear它可以在日期和时间戳操作多个功能。所以,如果timestampTimestampType所有你需要的是一个正确的表达。例如:

from pyspark.sql.functions import hour, mean 

(df 
    .groupBy(hour("timestamp").alias("hour")) 
    .agg(mean("value").alias("mean")) 
    .show()) 

## +----+------------------+ 
## |hour|    mean| 
## +----+------------------+ 
## | 0|508.05999999999995| 
## | 1| 449.8666666666666| 
## | 2| 524.9499999999999| 
## | 3|264.59999999999997| 
## +----+------------------+ 

预1.5.0你最好的选择是使用HiveContext和蜂巢的UDF要么selectExpr

df.selectExpr("year(timestamp) AS year", "value").groupBy("year").sum() 

## +----+---------+----------+ 
## |year|SUM(year)|SUM(value)| 
## +----+---------+----------+ 
## |2015| 40300| 9183.0| 
## +----+---------+----------+ 

或原始SQL:

df.registerTempTable("df") 

sqlContext.sql(""" 
    SELECT MONTH(timestamp) AS month, SUM(value) AS values_sum 
    FROM df 
    GROUP BY MONTH(timestamp)""") 

只要记住,聚合由Spark执行,而不是下推到外部源。通常这是一种理想的行为,但在某些情况下,您可能更喜欢将聚合作为子查询来限制数据传输。

+0

是他们的任何功能monthofyear? –

0

此外,您还可以使用DATE_FORMAT创造任何你想要的时间段。 GROUPBY特定日期:

from pyspark.sql import functions as F 
 

 
df.select(F.date_format('timestamp','yyyy-MM-dd').alias('day')).groupby('day').count().show()

GROUPBY具体月份(只是更改格式):

df.select(F.date_format('timestamp','yyyy-MM').alias('month')).groupby('month').count().show()