我使用Spark
与Ipython
和打印时有RDD
其中包含这种格式的数据:在PySpark找到数据的最小和最大日期在RDD
print rdd1.collect()
[u'2010-12-08 00:00:00', u'2010-12-18 01:20:00', u'2012-05-13 00:00:00',....]
每个数据是datetimestamp
,我想在此RDD
中找到最小值和最大值。我怎样才能做到这一点?
我使用Spark
与Ipython
和打印时有RDD
其中包含这种格式的数据:在PySpark找到数据的最小和最大日期在RDD
print rdd1.collect()
[u'2010-12-08 00:00:00', u'2010-12-18 01:20:00', u'2012-05-13 00:00:00',....]
每个数据是datetimestamp
,我想在此RDD
中找到最小值和最大值。我怎样才能做到这一点?
比如,你可以使用aggregate
功能(它的工作原理看到一个解释:What is the equivalent implementation of RDD.groupByKey() using RDD.aggregateByKey()?)
from datetime import datetime
rdd = sc.parallelize([
u'2010-12-08 00:00:00', u'2010-12-18 01:20:00', u'2012-05-13 00:00:00'])
def seq_op(acc, x):
""" Given a tuple (min-so-far, max-so-far) and a date string
return a tuple (min-including-current, max-including-current)
"""
d = datetime.strptime(x, '%Y-%m-%d %H:%M:%S')
return (min(d, acc[0]), max(d, acc[1]))
def comb_op(acc1, acc2):
""" Given a pair of tuples (min-so-far, max-so-far)
return a tuple (min-of-mins, max-of-maxs)
"""
return (min(acc1[0], acc2[0]), max(acc1[1], acc2[1]))
# (initial-min <- max-date, initial-max <- min-date)
rdd.aggregate((datetime.max, datetime.min), seq_op, comb_op)
## (datetime.datetime(2010, 12, 8, 0, 0), datetime.datetime(2012, 5, 13, 0, 0))
或DataFrames
:
from pyspark.sql import Row
from pyspark.sql.functions import from_unixtime, unix_timestamp, min, max
row = Row("ts")
df = rdd.map(row).toDF()
df.withColumn("ts", unix_timestamp("ts")).agg(
from_unixtime(min("ts")).alias("min_ts"),
from_unixtime(max("ts")).alias("max_ts")
).show()
## +-------------------+-------------------+
## | min_ts| max_ts|
## +-------------------+-------------------+
## |2010-12-08 00:00:00|2012-05-13 00:00:00|
## +-------------------+-------------------+
如果RDD由datetime对象的,什么是错的只需使用
rdd1.min()
rdd1.max()
这个例子对我的作品
rdd = sc.parallelize([u'2010-12-08 00:00:00', u'2010-12-18 01:20:00', u'2012-05-13 00:00:00'])
from datetime import datetime
rddT = rdd.map(lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S")).cache()
print rddT.min()
print rddT.max()
是有可能,如果你能提到有关代码是如何工作的一些意见(这两个函数特别是),使我能够正确地理解它? –
http://stackoverflow.com/a/31082341/1560062 – zero323