2017-02-13 80 views
2

我有以下数据框:如何添加运行标识新列星火数据帧(pyspark)

timestamp \t  sum 
 
31/01/2017 09:00 \t 0 
 
31/01/2017 10:00 \t 0 
 
31/01/2017 11:00 \t 0 
 
31/01/2017 12:00 \t 2 
 
31/01/2017 13:00 \t 2 
 
31/01/2017 14:00 \t 2 
 
31/01/2017 15:00 \t 11

,并想添加一个新的ID列 - 只是一个流水号像即:

+----------------+---+---------+ 
 
|  timestamp|sum|running_id| 
 
+----------------+---+---------+ 
 
|2017-01-31 09:00| 0|  0| 
 
|2017-01-31 10:00| 0|  1| 
 
|2017-01-31 11:00| 0|  2| 
 
|2017-01-31 12:00| 2|  3| 
 
|2017-01-31 13:00| 2|  4| 
 
|2017-01-31 14:00| 2|  5| 
 
|2017-01-31 15:00| 11|  6|

我做了这样的:

sub_data_spark = sub_data_spark.rdd.zipWithIndex().map(lambda x: (x[0][0],x[0][1],x[1])).toDF(sub_data_spark.columns+["running_id"])

有些人可以为一个 “干净” 的方式咨询?

感谢, 鲍里斯

回答

2

尝试。也可以使用PARTITION BY子句。

+0

如果没有添加PartitionBy子句,这将基本上将所有数据混洗到单个分区,这不会对并行性有好处。 –

+0

谢谢,这个工程。如何使用PartitionBy并保持时间戳的有序性(行的顺序应该保持不变) – Boris

+0

从表中选择*,row_Number()over(按TO_DATE分区(timestamp)顺序)。这会将不同日期的所有数据发送到不同的分区。但是对于所有分区计数将从1开始。这种方法有其优点和缺点。根据你的用例使用它。 –

2

到的唯一方法没有zipWithIndex或zipWithUniqueId你应该使用功能monotonically_increasing_id

此功能的工作原理是这样的:

产生单调递增64列位整数。

生成的ID保证是单调递增的,并且 是唯一的,但不是连续的。当前的实施将 分区ID放在高31位中,并将每个 分区内的记录号放在低33位中。假设数据帧 的分区少于10亿个,并且每个分区的记录少于8个 。

因此,对于你的情况,你可以使用这样的:

sub_data_spark.withColumn('Id', monotonically_increasing_id()).show() 

这将返回给您的唯一ID为您的模型。但它不会在0开始,并使用select *, row_Number() over (order by sum) from table

或基于逻辑的任何列将不连续

+0

谢谢,我确实寻找一个单调增加和唯一的数字=>行号 – Boris

+0

它确实有效,但如果您想要32位整数而不是64位,情况如何?这里的截断会导致问题。 –

+0

根据Spark代码,您将使用64位长的:https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst /expressions/MonotonicallyIncreasingID.scala#L48 –