2017-10-19 42 views
0

如斯卡拉一个例子,我有一个列表,每相匹配我希望出现两次(可能不适合这种使用情况下,最好的选择条件的项目 - 但知道哪数):迭代通过在数据帧的行和变换一对多

l.flatMap { 
    case n if n % 2 == 0 => List(n, n) 
    case n => List(n) 
} 

我愿做星火类似的东西 - 在一个数据帧遍历行,如果行符合一定的条件,那么我需要在副本中进行一些修改复制的行。如何才能做到这一点?

例如,如果我输入如下表所示:

| name | age | 
|-------|-----| 
| Peter | 50 | 
| Paul | 60 | 
| Mary | 70 | 

我想通过表迭代和测试各行对多个条件,并为每个匹配,一个条目应以创建条件匹配条件的名称。

E.g.条件#1为 “年龄> 60” 和条件#2是 “name.length < = 4”。这将导致下面的输出:

| name | age |condition| 
|-------|-----|---------| 
| Paul | 60 | 2 | 
| Mary | 70 | 1 | 
| Mary | 70 | 2 | 
+0

你应该能够做到这一点的'flatMap'为好。你能显示一些实际数据吗? – Psidom

+0

加入让它能够更清晰 –

+0

你想放弃行,其中'name.length> 4',如果'年龄> 60'也'name.length> 4'?你还需要*条件*列吗? – Psidom

回答

2

您可以filter匹配条件dataframes,最后union所有的人。

import org.apache.spark.sql.functions._ 
val condition1DF = df.filter($"age" > 60).withColumn("condition", lit(1)) 
val condition2DF = df.filter(length($"name") <= 4).withColumn("condition", lit(2)) 

val finalDF = condition1DF.union(condition2DF) 

你应该有你想要的输出

+----+---+---------+ 
|name|age|condition| 
+----+---+---------+ 
|Mary|70 |1  | 
|Paul|60 |2  | 
|Mary|70 |2  | 
+----+---+---------+ 

我希望答案是有帮助的

+0

谢谢,这种解决方案给了我最大的灵活性,并完美适合多种条件,从配置读取条件,多个表等 –

+0

很高兴听到@EvanM。 :)感谢您的支持和接受 –

1

这里是rdd.flatMap扁平化的一种方式:

import org.apache.spark.sql.types._ 
import org.apache.spark.sql.Row 

val new_rdd = (df.rdd.flatMap(r => { 
    val conditions = Seq((1, r.getAs[Int](1) > 60), (2, r.getAs[String](0).length <= 4)) 
    conditions.collect{ case (i, c) if c => Row.fromSeq(r.toSeq :+ i) } 
})) 

val new_schema = StructType(df.schema :+ StructField("condition", IntegerType, true)) 

spark.createDataFrame(new_rdd, new_schema).show 
+----+---+---------+ 
|name|age|condition| 
+----+---+---------+ 
|Paul| 60|  2| 
|Mary| 70|  1| 
|Mary| 70|  2| 
+----+---+---------+ 
+0

你为什么'df.rdd'使用'flatMap'? –

+0

@JacekLaskowski如果我不使用'rdd'我得到的错误。 *无法找到编码器... *。 – Psidom

+0

@Psidom尝试'进口spark.implicits._',有了它,你应该能够使用'flatMap'对数据帧来代替。 'Spark'这里是'SparkSession'。 – Shaido

1

您也可以使用UDF和explode()的组合,如下面的例子:

// set up example data 
case class Pers1 (name:String,age:Int) 
val d = Seq(Pers1("Peter",50), Pers1("Paul",60), Pers1("Mary",70)) 
val df = spark.createDataFrame(d) 

// conditions logic - complex as you'd like 
// probably should use a Set instead of Sequence but I digress.. 
val conditions:(String,Int)=>Seq[Int] = { (name,age) => 
    (if(age > 60) Seq(1) else Seq.empty) ++ 
    (if(name.length <=4) Seq(2) else Seq.empty) 
} 
// define UDF for spark 
import org.apache.spark.sql.functions.udf 
val conditionsUdf = udf(conditions) 
// explode() works just like flatmap 
val result = df.withColumn("condition", 
    explode(conditionsUdf(col("name"), col("age")))) 
result.show 

+----+---+---------+ 
|name|age|condition| 
+----+---+---------+ 
|Paul| 60|  2| 
|Mary| 70|  1| 
|Mary| 70|  2| 
+----+---+---------+