2016-10-10 105 views
1

我正在尝试使用Spark的“主角”功能,但在使用它时出现奇怪的行为。主导功能不能正常工作

这里是我的输入数据样本(由 “ROW_ID” 和 “dt_capt” 排序):

row_id,  dt_capt,     dt_capt_time 
_____________________________________________________________ 
1) 1-14ZBW-76, 2016-07-20 12:46:51.516005, 124651516005 
2) 1-1BHPHNU, 2016-07-20 21:07:05.779006, 210705779006 
3) 1-1BZ1F5B, 2016-07-20 21:07:05.779008, 210705779008 
4) 1-1IE18-116,2016-07-20 09:48:52.411000, 94852411000 
5) 1-1JEVXD, 2016-07-20 09:05:16.502001, 90516502001 
6) 1-1JGTHE, 2016-07-20 09:04:24.183001, 90424183001 
7) 1-1KQA6M8, 2016-07-20 21:06:02.483002, 210602483002 
8) 1-1WI4W1P, 2016-07-20 09:04:06.163001, 90406163001 
9) 1-1XIZRHX, 2016-07-20 00:00:27.646000, 27646000 
10) 1-1Y932X9, 2016-07-20 16:47:51.774001, 164751774001 
11) 1-1Y932X9, 2016-07-20 21:39:29.662002, 213929662002 
12) 1-1YYW7-3, 2016-07-20 13:32:18.110004, 133218110004 
13) 1-1YYW7-3, 2016-07-20 13:32:18.114001, 133218114001 
14) 1-21CY-79, 2016-07-20 18:12:16.663003, 181216663003 
15) 1-21CY-79, 2016-07-20 18:12:16.663008, 181216663008 
16) 1-22BT-399, 2016-07-20 16:13:12.259003, 161312259003 
17) 1-22BT-399, 2016-07-20 21:39:29.662006, 213929662006 
18) 1-22BV-801, 2016-07-20 18:07:24.710001, 180724710001 
19) 1-22BV-801, 2016-07-20 18:09:52.584005, 180952584005 
20) 1-22BV-801, 2016-07-20 18:12:19.676002, 181219676002 

我的代码:

#All the imports 
from pyspark import SparkConf,SparkContext 
from pyspark import HiveContext 
from pyspark.sql.window import Window 
from pyspark.sql.functions import lead,col,lit,udf 
from pyspark.sql.types import * 
import io 
import sys 

conf = SparkConf().set('spark.kryoserializer.buffer.max', '512m') 
sc=SparkContext(conf=conf) 
hc=HiveContext(sc) 

#Show only errors 
sc.setLogLevel("ERROR") 

#Get input data 
delta = hc.sql("SELECT ROW_ID,DT_CAPT,UNIX_TIMESTAMP(DT_CAPT) as  DT_CAPT_TS,CAST(regexp_replace(split(LKR_GRC_S_CONTACT.dt_capt, ' ')[1], \"\:| [.]\", '') AS BIGINT) AS dt_capt_time FROM int_lkr.lkr_grc_s_contact WHERE  regexp_replace(to_date(DT_CAPT), '-', '') == \"20160720\" ORDER BY  ROW_ID,DT_CAPT") 
delta.registerTempTable("delta") 

#Compute "mvt_suiv" with the lead function 
w = Window().partitionBy("ROW_ID").orderBy("dt_capt_time") 
delta2 = delta.select("row_id","dt_capt","dt_capt_time",lead("dt_capt_time").over(w).alias("mvt_suiv")) 

而且输出的结果:

row_id,  dt_capt,     dt_capt_time, mvt_suiv 
_____________________________________________________________________________ 
1) 1-14ZBW-76, 2016-07-20 12:46:51.516005, 124651516005, NULL 
2) 1-1BHPHNU, 2016-07-20 21:07:05.779006, 210705779006, NULL 
3) 1-1BZ1F5B, 2016-07-20 21:07:05.779008, 210705779008, NULL 
4) 1-1IE18-116,2016-07-20 09:48:52.411000, 94852411000, NULL 
5) 1-1JEVXD, 2016-07-20 09:05:16.502001, 90516502001, 171798691866 
6) 1-1JGTHE, 2016-07-20 09:04:24.183001, 90424183001, NULL 
7) 1-1KQA6M8, 2016-07-20 21:06:02.483002, 210602483002, NULL 
8) 1-1WI4W1P, 2016-07-20 09:04:06.163001, 90406163001, NULL 
9) 1-1XIZRHX, 2016-07-20 00:00:27.646000, 27646000,  NULL 
10) 1-1Y932X9, 2016-07-20 16:47:51.774001, 164751774001, 213929662002 
11) 1-1Y932X9, 2016-07-20 21:39:29.662002, 213929662002, NULL 
12) 1-1YYW7-3, 2016-07-20 13:32:18.110004, 133218110004, 133218110004 
13) 1-1YYW7-3, 2016-07-20 13:32:18.114001, 133218114001, 133218114001 
14) 1-21CY-79, 2016-07-20 18:12:16.663003, 181216663003, 181216663008 
15) 1-21CY-79, 2016-07-20 18:12:16.663008, 181216663008, NULL 
16) 1-22BT-399, 2016-07-20 16:13:12.259003, 161312259003, 213929662006 
17) 1-22BT-399, 2016-07-20 21:39:29.662006, 213929662006, NULL 
18) 1-22BV-801, 2016-07-20 18:07:24.710001, 180724710001, 180952584005 
19) 1-22BV-801, 2016-07-20 18:09:52.584005, 180952584005, 181219676002 
20) 1-22BV-801, 2016-07-20 18:12:19.676002, 181219676002, NULL 

正如你所看到的,它不能正常工作。 (第5,12和13行)。

5号线:它应该是“NULL”因为没有下一行对于ROW_ID

第12行:这应该是“133218114001”,而不是“133218110004”

第13行:应该是“NULL”,因为该ROW_ID没有下一行。

我做错了什么?我尝试过使用字符串值和整数值,但我仍然有一些奇怪的行为与“领先”(与“滞后”相同的事情)。我觉得Spark中的“窗口”函数仍然包含很多错误(至少在Spark 1.5中)。有人可以证实吗?

Cloudera的版本: 5.5.1

星火版本: 1.5.0

谢谢!

回答

0

我看不出你的代码有什么问题(也许我错过了它),但问题是1.5.0有一个错误的窗口函数实现。检查问题的这种捆绑,也许你命中了其中一​​个(或副作用):

https://issues.apache.org/jira/browse/SPARK-11009

它是固定在CDH 5.5.4

https://www.cloudera.com/documentation/enterprise/release-notes/topics/cdh_rn_fixed_in_55.html

+0

谢谢您的回答。我也看到很多人在Spark 1.5.0的窗口函数中遇到问题 –

+1

所以我不得不为这个问题找到解决方法,因为我不能等待我们的Cloudera升级。我简单地使用了Hive的lead函数(作为一个单独的脚本),然后我回到Spark进行其余的计算。谢谢 ! –