2017-03-20 122 views
0

我正在使用用Python编写的Hive包装器将Hive数据拉入Python Jupyter笔记本中。我有TB级的数据如下所示:收藏集?

Table 1: time=t1 
uid colA 
1  A 
1  B 
1  C 
2  A 
2  B 
3  C 
3  D 

我想从那个看起来像上面的数据创建一个新的数据框(PySpark /大熊猫):

Table 2: time=t1 
uid colA 
1  [A, B, C] 
2  [A, B] 
3  [C, D] 

其中colA将是一个列表的字符串。我将如何做到这一点?我已阅读过有关collect_set()的内容,但不熟悉它的使用或适用性。

创建Table 2后,假设我有另一个表为time=t2:现在

Table 3: time=t2 
uid colA 
1  [A, B] 
2  [B] 
3  [C, D, E] 

,我想计算table 2table 3之间的差集。它应该返回3,因为这是获得从表3至表2

+0

那么,你有什么哟你试过了吗? –

回答

2

这里总结了问题的解决方案所需的添加/删除的数目。希望这会为你使用pyspark。

全球进口: -

import pyspark.sql.functions as F 
import pyspark.sql.types as T 

表2创建代码: -

df1 = sc.parallelize([ 
     [1,'A'], [1,'B'], [1,'C'], [2,'A'], [2,'B'], [3, 'C'], [3,'D'] 
     ]).toDF(['uid', 'colA']).groupBy("uid").agg(F.collect_set("colA").alias("colA")) 

df1.show() 
+---+---------+ 
|uid|  colA| 
+---+---------+ 
| 1|[A, B, C]| 
| 2| [A, B]| 
| 3| [C, D]| 
+---+---------+ 

表3创建代码: -

df2 = sc.parallelize([[1, ['A', 'B']],[2, ['B']],[3, ['C', 'D', 'E']]]).toDF(['uid', 'colA']) 
def diffUdfFunc(x,y): 
    return list(set(y).difference(set(x))) 

diffUdf = F.udf(diffUdfFunc,T.ArrayType(T.StringType())) 
finaldf = df1.withColumnRenamed("colA", "colA1").join(df2, "uid").withColumnRenamed("colA", "colA2").withColumn("diffCol", diffUdf(F.col("colA1"), F.col("colA2"))) 
finaldf.select("uid", F.col("diffCol").alias("colA")).where(F.size("colA") > 0).show() 
+---+----+ 
|uid|colA| 
+---+----+ 
| 3| [E]| 
+---+----+