2017-08-01 138 views
0

我有一个输入数据框(ip_df),在该数据帧的数据看起来像如下:时间戳更改为UTC格式Pyspark

id   timestamp_value 
1  2017-08-01T14:30:00+05:30 
2  2017-08-01T14:30:00+06:30 
3  2017-08-01T14:30:00+07:30 

我需要建立一个新的数据框(op_df) ,其中我需要将时间戳值转换为UTC格式。所以最终的输出数据帧将如下所示:

id   timestamp_value 
1  2017-08-01T09:00:00+00:00 
2  2017-08-01T08:00:00+00:00 
3  2017-08-01T07:00:00+00:00 

我想用PySpark来实现它。有人可以帮助我吗?任何帮助都会被蒙蔽。

回答

2

您可以使用解析器tz in dateutil库。
我假设你有一个字符串,你想一个字符串列:

from dateutil import parser, tz 
from pyspark.sql.types import StringType 
from pyspark.sql.functions import col, udf 

# Create UTC timezone 
utc_zone = tz.gettz('UTC') 

# Create UDF function that apply on the column 
# It takes the String, parse it to a timestamp, convert to UTC, then convert to String again 
func = udf(lambda x: parser.parse(x).astimezone(utc_zone).isoformat(), StringType()) 

# Create new column in your dataset 
df = df.withColumn("new_timestamp",func(col("timestamp_value"))) 

它给出了这样的结果:

<pre> 
+---+-------------------------+-------------------------+ 
|id |timestamp_value   |new_timestamp   | 
+---+-------------------------+-------------------------+ 
|1 |2017-08-01T14:30:00+05:30|2017-08-01T09:00:00+00:00| 
|2 |2017-08-01T14:30:00+06:30|2017-08-01T08:00:00+00:00| 
|3 |2017-08-01T14:30:00+07:30|2017-08-01T07:00:00+00:00| 
+---+-------------------------+-------------------------+ 
</pre> 

最后,你可以删除和重命名:

df = df.drop("timestamp_value").withColumnRenamed("new_timestamp","timestamp_value") 
3

如果你绝对需要完全按照指示格式化时间戳,即以时区为“+00:00”,我认为使用UDF作为already suggested是你最好的选择。

但是,如果您可以容忍时区稍有不同的表示,例如无论是“+0000”(不是冒号分隔符)还是“Z”,都可以在不使用UDF的情况下执行此操作,这取决于数据集的大小,对您来说性能可能会更好。

鉴于数据

+---+-------------------------+ 
|id |timestamp_value   | 
+---+-------------------------+ 
|1 |2017-08-01T14:30:00+05:30| 
|2 |2017-08-01T14:30:00+06:30| 
|3 |2017-08-01T14:30:00+07:30| 
+---+-------------------------+ 

以下表示按以下给出:

l = [(1, '2017-08-01T14:30:00+05:30'), (2, '2017-08-01T14:30:00+06:30'), (3, '2017-08-01T14:30:00+07:30')] 
ip_df = spark.createDataFrame(l, ['id', 'timestamp_value']) 

其中timestamp_valueString,你可以做以下(这使用to_timestampsession local timezone support这是在星火介绍2.2):

from pyspark.sql.functions import to_timestamp, date_format 
spark.conf.set('spark.sql.session.timeZone', 'UTC') 
op_df = ip_df.select(
    date_format(
     to_timestamp(ip_df.timestamp_value, "yyyy-MM-dd'T'HH:mm:ssXXX"), 
     "yyyy-MM-dd'T'HH:mm:ssZ" 
    ).alias('timestamp_value')) 

其收率:

+------------------------+ 
|timestamp_value   | 
+------------------------+ 
|2017-08-01T09:00:00+0000| 
|2017-08-01T08:00:00+0000| 
|2017-08-01T07:00:00+0000| 
+------------------------+ 

,或者稍有不同:

op_df = ip_df.select(
    date_format(
     to_timestamp(ip_df.timestamp_value, "yyyy-MM-dd'T'HH:mm:ssXXX"), 
     "yyyy-MM-dd'T'HH:mm:ssXXX" 
    ).alias('timestamp_value')) 

其产生:

+--------------------+ 
|timestamp_value  | 
+--------------------+ 
|2017-08-01T09:00:00Z| 
|2017-08-01T08:00:00Z| 
|2017-08-01T07:00:00Z| 
+--------------------+