2017-04-18 36 views
0
val df1 = sc.parallelize(Seq(
    ("a1",10,"ACTIVE","ds1"), 
    ("a1",20,"ACTIVE","ds1"), 
    ("a2",50,"ACTIVE","ds1"), 
    ("a3",60,"ACTIVE","ds1")) 
).toDF("c1","c2","c3","c4")` 

val df2 = sc.parallelize(Seq(
    ("a1",10,"ACTIVE","ds2"), 
    ("a1",20,"ACTIVE","ds2"), 
    ("a1",30,"ACTIVE","ds2"), 
    ("a1",40,"ACTIVE","ds2"), 
    ("a4",20,"ACTIVE","ds2")) 
).toDF("c1","c2","c3","c5")` 


df1.show() 

// +---+---+------+---+ 
// | c1| c2| c3| c4| 
// +---+---+------+---+ 
// | a1| 10|ACTIVE|ds1| 
// | a1| 20|ACTIVE|ds1| 
// | a2| 50|ACTIVE|ds1| 
// | a3| 60|ACTIVE|ds1| 
// +---+---+------+---+ 

df2.show() 
// +---+---+------+---+ 
// | c1| c2| c3| c5| 
// +---+---+------+---+ 
// | a1| 10|ACTIVE|ds2| 
// | a1| 20|ACTIVE|ds2| 
// | a1| 30|ACTIVE|ds2| 
// | a1| 40|ACTIVE|ds2| 
// | a4| 20|ACTIVE|ds2| 
// +---+---+------+---+ 

我的要求是:我需要连接两个数据帧。 我的输出数据帧应该包含来自df1的所有记录以及来自df2的所有记录,这些记录不仅仅适用于匹配“c1”的df1。我从df2中提取的记录应该在列“c3”处更新为“非活动”。如何连接两个DataFrame并更改缺少值的列?

在这个例子中,只有“c1”的匹配值是a1。所以我需要从df2中取出c2 = 30和40条记录,并使它们成为非活动状态。

这里是输出。

df_output.show() 

// +---+---+--------+---+ 
// | c1| c2| c3 | c4| 
// +---+---+--------+---+ 
// | a1| 10|ACTIVE |ds1| 
// | a1| 20|ACTIVE |ds1| 
// | a2| 50|ACTIVE |ds1| 
// | a3| 60|ACTIVE |ds1| 
// | a1| 30|INACTIVE|ds1| 
// | a1| 40|INACTIVE|ds1| 
// +---+---+--------+---+ 

任何人都可以帮助我做到这一点。

+0

对于INACTIVE记录,c4值是否从ds2更改为ds1? – Pushkr

回答

1

首先,一件小事。我在df2使用的列不同的名称:

val df2 = sc.parallelize(...).toDF("d1","d2","d3","d4") 

没什么大不了的,但这种做事情容易,我推理。

现在的有趣的东西。我将是一个有点冗长,为清楚起见:

val join = df1 
.join(df2, df1("c1") === df2("d1"), "inner") 
.select($"d1", $"d2", $"d3", lit("ds1").as("d4")) 
.dropDuplicates 

我在这里如下:

  • 内的c1d1df1df2之间加入
  • 选择df2列和简单的“硬编码”ds1在最后一列进行替换ds2
  • 删除重复项

这基本上只是过滤掉一切df2df1c1一个对应的按键。

下一页I DIFF:

val diff = join 
.except(df1) 
.select($"d1", $"d2", lit("INACTIVE").as("d3"), $"d4") 

这是在joindf1耳目一新了一套基本的操作。这些是要停用的项目,所以我选择所有列,但用硬编码的INACTIVE值替换第三列。

剩下要做的事情就是把它们放在一起:

df1.union(diff) 

这只是结合df1与我们前面计算能产生最终结果停用值表:再次

+---+---+--------+---+ 
| c1| c2|  c3| c4| 
+---+---+--------+---+ 
| a1| 10| ACTIVE|ds1| 
| a1| 20| ACTIVE|ds1| 
| a2| 50| ACTIVE|ds1| 
| a3| 60| ACTIVE|ds1| 
| a1| 30|INACTIVE|ds1| 
| a1| 40|INACTIVE|ds1| 
+---+---+--------+---+ 

而且,你不需要所有这些中间值。我只是很详细地帮助追踪整个过程。

+0

'val c1Ids = df1.select(“c1”)。as [String] .collect()' 'val joinDf = df1.as(“t1”)。join(df2.as(“t2”),df1 “c1”)=== df2(“c1”),“rightouter”)。select($“t2.c1”,$“t2.c2”)。distinct()。withColumn(“c3”,lit(“INACTIVE “))。withColumn(”c4“,lit(”ds1“))。filter(not($”c1“)。isin(c1Ids:_ *))' – Ramesh

+0

'val finalDf = df1.unionAll(joinDf)'This应该给出输出。 – Ramesh

+0

我确信有多种方法可能会比我的更好,以获得该输出。很高兴为您指出正确的方向。祝你的项目好运! – Vidya

0

这里是肮脏的解决方案 -

from pyspark.sql import functions as F 


# find the rows from df2 that have matching key c1 in df2 
df3 = df1.join(df2,df1.c1==df2.c1)\ 
.select(df2.c1,df2.c2,df2.c3,df2.c5.alias('c4'))\ 
.dropDuplicates() 

df3.show() 

+---+---+------+---+ 
| c1| c2| c3| c4| 
+---+---+------+---+ 
| a1| 10|ACTIVE|ds2| 
| a1| 20|ACTIVE|ds2| 
| a1| 30|ACTIVE|ds2| 
| a1| 40|ACTIVE|ds2| 
+---+---+------+---+ 

# Union df3 with df1 and change columns c3 and c4 if c4 value is 'ds2' 

df1.union(df3).dropDuplicates(['c1','c2'])\ 
.select('c1','c2',\ 
     F.when(df1.c4=='ds2','INACTIVE').otherwise('ACTIVE').alias('c3'), 
     F.when(df1.c4=='ds2','ds1').otherwise('ds1').alias('c4') 
     )\ 
.orderBy('c1','c2')\ 
.show() 

+---+---+--------+---+ 
| c1| c2|  c3| c4| 
+---+---+--------+---+ 
| a1| 10| ACTIVE|ds1| 
| a1| 20| ACTIVE|ds1| 
| a1| 30|INACTIVE|ds1| 
| a1| 40|INACTIVE|ds1| 
| a2| 50| ACTIVE|ds1| 
| a3| 60| ACTIVE|ds1| 
+---+---+--------+---+ 
0

喜欢这个挑战,这里是我的解决方案。

val c1keys = df1.select("c1").distinct 
val df2_in_df1 = df2.join(c1keys, Seq("c1"), "inner") 
val df2inactive = df2_in_df1.join(df1, Seq("c1", "c2"), "leftanti").withColumn("c3", lit("INACTIVE")) 
scala> df1.union(df2inactive).show 
+---+---+--------+---+ 
| c1| c2|  c3| c4| 
+---+---+--------+---+ 
| a1| 10| ACTIVE|ds1| 
| a1| 20| ACTIVE|ds1| 
| a2| 50| ACTIVE|ds1| 
| a3| 60| ACTIVE|ds1| 
| a1| 30|INACTIVE|ds2| 
| a1| 40|INACTIVE|ds2| 
+---+---+--------+---+