2016-11-02 59 views
0

我在Spark版本2.0.0中使用python版本3.5。我有一个包含两列的父表:GroupID和ID。我需要将数据分成3个子集,因此我使用分层采样。 加入PySpark会产生意想不到的结果

一旦我获得了我的子集(样本A),我执行一个左连接并查找样本ID为空。预期输出是父表的一个子集,其不含有样品A的ID任何

spark = SparkSession.builder.enableHiveSupport().appName("myApp").getOrCreate() 

data = [(0, 100), (0, 101), (0, 102), (0, 103), (0, 1000), (1, 104), (1,105), (1, 106), (1, 107), (1, 1007)] 
df = spark.createDataFrame(data, ['group', 'id']) 

## Stratified Sampling 
fractions = dict((int(item), 0.45) for item in np.arange(0,(2))) 
sampled_for_A = df.sampleBy("group", fractions, seed=7) 
sampled_for_A.orderBy("id").show() 

## Version 1 

sampled_for_BC = df.join(sampled_for_A, df.id == sampled_for_A.id, "left_outer").select(df.group, df.id, sampled_for_A.group.alias("groupA"), sampled_for_A.id.alias("id_A")) 
sampled_for_BC.where(sampled_for_BC.id_A.isNull()).show() 

在版本1中的BC表被填充,而是从父表中的值被复制过。

Group A 
+-----+----+ 
|group| id| 
+-----+----+ 
| 0| 101| 
| 0| 102| 
| 1| 104| 
| 1| 105| 
| 1| 107| 
| 0|1000| 
+-----+----+ 

BC 
+-----+----+------+----+ 
|group| id|groupA|id_A| 
+-----+----+------+----+ 
| 1| 107|  1| 107| 
| 0| 103|  0| 103| 
| 1| 104|  1| 104| 
| 0|1000|  0|1000| 
| 1| 106|  1| 106| 
| 0| 100|  0| 100| 
| 1| 105|  1| 105| 
| 1|1007|  1|1007| 
| 0| 101|  0| 101| 
| 0| 102|  0| 102| 
+-----+----+------+----+ 

2版createOrReplaceTempView和执行LEFT JOIN与SQL查询返回预期的结果

df.createOrReplaceTempView("parentTable") 
sampled_for_A.createOrReplaceTempView("groupA") 

subset_BC = spark.sql(''' 
SELECT a.group, 
    a.id, 
    b.group AS group_A, 
    b.id AS id_A 
FROM parentTable a 
LEFT JOIN groupA b 
ON a.id = b.id 
WHERE b.id IS NULL 
''').show() 

正如预期的那样:

BC 
+-----+----+-------+----+ 
|group| id|group_A|id_A| 
+-----+----+-------+----+ 
| 0| 103| null|null| 
| 1| 106| null|null| 
| 0| 100| null|null| 
| 1|1007| null|null| 
+-----+----+-------+----+ 

是什么样的地方版本1条,我我错过了? (另外我的数据有1亿行,甚至在第2版中执行的sql左连接也会出现奇怪的现象,我也尝试在组列上进行重新分区)。

回答

0

版本1中的join正常,但接下来的选择覆盖已加入的数据。这是因为idgroup列来自相同的数据帧(df),所以基本上df.group == sampled_for_A.group

为了达到想要的特性,必须在加入之前别名列,例如:

>>> sampled_for_A_aliased = sampled_for_A \ 
...  .select(sampled_for_A.id.alias('id_A'), sampled_for_A.group.alias('groupA')) 
>>> df.join(sampled_for_A_aliased, df.id == sampled_for_A_aliased.id_A, "left_outer").show() 
+-----+----+----+------+               
|group| id|id_A|groupA| 
+-----+----+----+------+ 
| 1| 107|null| null| 
| 0| 103|null| null| 
| 1| 104| 104|  1| 
| 0|1000|1000|  0| 
| 1| 106| 106|  1| 
| 0| 100|null| null| 
| 1| 105| 105|  1| 
| 1|1007|null| null| 
| 0| 101| 101|  0| 
| 0| 102| 102|  0| 
+-----+----+----+------+ 
相关问题