2016-07-27 157 views
0

两个表之间的最近一次,我用pyspark和我有两个dataframes这样的:找到火花

user   time   bus 
A 2016/07/18 12:00:00 1 
B 2016/07/19 12:00:00 2 
C 2016/07/20 12:00:00 3 

bus   time   stop 
1 2016/07/18 11:59:40 sA 
1 2016/07/18 11:59:50 sB 
1 2016/07/18 12:00:05 sC 
2 2016/07/19 11:59:40 sB 
2 2016/07/19 12:00:10 sC 
3 2016/07/20 11:59:55 sD 
3 2016/07/20 12:00:10 sE 

现在我想知道在哪站根据总线编号和最接近用户报告时间在第二张桌子。

例如,在表1中,用户A在2016/07/18 12:00:00报告,他在1号公共汽车上,根据第二张表,公交车1号,但最近的时间是2016/07/18 12:00:05(第三个记录),所以用户现在在sC中。

所需的输出应该是这样的:

user   time   bus stop 
A 2016/07/18 12:00:00 1 sC 
B 2016/07/19 12:00:00 2 sC 
C 2016/07/20 12:00:00 3 sD 

我已转一次进入时间戳,这样唯一的问题是找到最接近的时间戳,其中公交车数量为eqaul。

因为我现在不熟悉sql,所以试图使用map函数来查找最近的时间和停止点,这意味着我必须在map函数中使用sqlContext.sql,并且spark似乎并不是允许这样做:

异常:您似乎试图从广播变量,操作或转换中引用SparkContext。 SparkContext只能在驱动程序上使用,而不能在其上运行的代码中使用。有关更多信息,请参阅SPARK-5063。

那么如何写一个SQL查询来获得正确的输出?

回答

3

这可以使用窗口函数完成。

from pyspark.sql.window import Window 
from pyspark.sql import Row, functions as W 

def tm(str): 
    return datetime.strptime(str, "%Y/%m/%d %H:%M:%S") 

#setup data 
userTime = [ Row(user="A",time=tm("2016/07/18 12:00:00"),bus = 1) ] 
userTime.append(Row(user="B",time=tm("2016/07/19 12:00:00"),bus = 2)) 
userTime.append(Row(user="C",time=tm("2016/07/20 12:00:00"),bus = 3)) 

busTime = [ Row(bus=1,time=tm("2016/07/18 11:59:40"),stop = "sA") ] 
busTime.append(Row(bus=1,time=tm("2016/07/18 11:59:50"),stop = "sB")) 
busTime.append(Row(bus=1,time=tm("2016/07/18 12:00:05"),stop = "sC")) 
busTime.append(Row(bus=2,time=tm("2016/07/19 11:59:40"),stop = "sB")) 
busTime.append(Row(bus=2,time=tm("2016/07/19 12:00:10"),stop = "sC")) 
busTime.append(Row(bus=3,time=tm("2016/07/20 11:59:55"),stop = "sD")) 
busTime.append(Row(bus=3,time=tm("2016/07/20 12:00:10"),stop = "sE")) 

#create RDD 
userDf = sc.parallelize(userTime).toDF().alias("usertime") 
busDf = sc.parallelize(busTime).toDF().alias("bustime") 

joinedDF = userDf.join(busDf,col("usertime.bus") == col("bustime.bus"),"inner").select(
    userDf.user, 
    userDf.time.alias("user_time"), 
    busDf.bus, 
    busDf.time.alias("bus_time"), 
    busDf.stop) 

additional_cols = joinedDF.withColumn("bus_time_diff", abs(unix_timestamp(col("bus_time")) - unix_timestamp(col("user_time")))) 

partDf = additional_cols.select("user","user_time","bus","bus_time","stop","bus_time_diff", W.rowNumber().over(Window.partitionBy("user","bus").orderBy("bus_time_diff")).alias("rank")).filter(col("rank") == 1) 


additional_cols.show(20,False) 
partDf.show(20,False) 

输出:

+----+---------------------+---+---------------------+----+-------------+ 
|user|user_time   |bus|bus_time    |stop|bus_time_diff| 
+----+---------------------+---+---------------------+----+-------------+ 
|A |2016-07-18 12:00:00.0|1 |2016-07-18 11:59:40.0|sA |20   | 
|A |2016-07-18 12:00:00.0|1 |2016-07-18 11:59:50.0|sB |10   | 
|A |2016-07-18 12:00:00.0|1 |2016-07-18 12:00:05.0|sC |5   | 
|B |2016-07-19 12:00:00.0|2 |2016-07-19 11:59:40.0|sB |20   | 
|B |2016-07-19 12:00:00.0|2 |2016-07-19 12:00:10.0|sC |10   | 
|C |2016-07-20 12:00:00.0|3 |2016-07-20 11:59:55.0|sD |5   | 
|C |2016-07-20 12:00:00.0|3 |2016-07-20 12:00:10.0|sE |10   | 
+----+---------------------+---+---------------------+----+-------------+ 
+----+---------------------+---+---------------------+----+-------------+----+ 
|user|user_time   |bus|bus_time    |stop|bus_time_diff|rank| 
+----+---------------------+---+---------------------+----+-------------+----+ 
|A |2016-07-18 12:00:00.0|1 |2016-07-18 12:00:05.0|sC |5   |1 | 
|B |2016-07-19 12:00:00.0|2 |2016-07-19 12:00:10.0|sC |10   |1 | 
|C |2016-07-20 12:00:00.0|3 |2016-07-20 11:59:55.0|sD |5   |1 | 
+----+---------------------+---+---------------------+----+-------------+----+ 
+0

这是非常不错的,你要解决我的问题,非常感谢! – Finn

+0

不客气! –