0
我在Spark版本2.0.0中使用python版本3.5。我有一个包含两列的父表:GroupID和ID。我需要将数据分成3个子集,因此我使用分层采样。 加入PySpark会产生意想不到的结果
一旦我获得了我的子集(样本A),我执行一个左连接并查找样本ID为空。预期输出是父表的一个子集,其不含有样品A的ID任何
spark = SparkSession.builder.enableHiveSupport().appName("myApp").getOrCreate()
data = [(0, 100), (0, 101), (0, 102), (0, 103), (0, 1000), (1, 104), (1,105), (1, 106), (1, 107), (1, 1007)]
df = spark.createDataFrame(data, ['group', 'id'])
## Stratified Sampling
fractions = dict((int(item), 0.45) for item in np.arange(0,(2)))
sampled_for_A = df.sampleBy("group", fractions, seed=7)
sampled_for_A.orderBy("id").show()
## Version 1
sampled_for_BC = df.join(sampled_for_A, df.id == sampled_for_A.id, "left_outer").select(df.group, df.id, sampled_for_A.group.alias("groupA"), sampled_for_A.id.alias("id_A"))
sampled_for_BC.where(sampled_for_BC.id_A.isNull()).show()
在版本1中的BC表被填充,而是从父表中的值被复制过。
Group A
+-----+----+
|group| id|
+-----+----+
| 0| 101|
| 0| 102|
| 1| 104|
| 1| 105|
| 1| 107|
| 0|1000|
+-----+----+
BC
+-----+----+------+----+
|group| id|groupA|id_A|
+-----+----+------+----+
| 1| 107| 1| 107|
| 0| 103| 0| 103|
| 1| 104| 1| 104|
| 0|1000| 0|1000|
| 1| 106| 1| 106|
| 0| 100| 0| 100|
| 1| 105| 1| 105|
| 1|1007| 1|1007|
| 0| 101| 0| 101|
| 0| 102| 0| 102|
+-----+----+------+----+
2版createOrReplaceTempView和执行LEFT JOIN与SQL查询返回预期的结果
df.createOrReplaceTempView("parentTable")
sampled_for_A.createOrReplaceTempView("groupA")
subset_BC = spark.sql('''
SELECT a.group,
a.id,
b.group AS group_A,
b.id AS id_A
FROM parentTable a
LEFT JOIN groupA b
ON a.id = b.id
WHERE b.id IS NULL
''').show()
正如预期的那样:
BC
+-----+----+-------+----+
|group| id|group_A|id_A|
+-----+----+-------+----+
| 0| 103| null|null|
| 1| 106| null|null|
| 0| 100| null|null|
| 1|1007| null|null|
+-----+----+-------+----+
是什么样的地方版本1条,我我错过了? (另外我的数据有1亿行,甚至在第2版中执行的sql左连接也会出现奇怪的现象,我也尝试在组列上进行重新分区)。