2016-02-25 40 views

回答

2

像这样的东西应该做的伎俩:

import scala.reflect.ClassTag 
import org.apache.spark.sql.Column 

type BinColOp = (Column, Column) => Column 

def check[T](f: BinColOp)(
    col: Column, pred: (Column, T) => Column, xs: Seq[T]) = { 
    xs.map(other => pred(col, other)).reduce(f) 
} 

val all = check[Column] (_ && _) _ 
val any = check[Column] (_ || _) _ 

用法示例:

val df = sc.parallelize(
    (1L, "foo", 3.6) :: 
    (2L, "bar", -1.0) :: 
    Nil 
).toDF("v", "x", "y") 

df.select(all($"v", _ > _, Seq(lit(-1), lit(1)))).show 
// +---------------------+ 
// |((v > -1) && (v > 1))| 
// +---------------------+ 
// |    false| 
// |     true| 
// +---------------------+ 

df.select(any($"x", _ !== _, Seq(lit("foo"), lit("baz")))).show 
// +--------------------------------+ 
// |(NOT (x = foo) || NOT (x = baz))| 
// +--------------------------------+ 
// |       true| 
// |       true| 
// +--------------------------------+ 

df.select(all($"x", _ !== _, Seq(lit("foo"), lit("baz")))).show 
// +--------------------------------+ 
// |(NOT (x = foo) && NOT (x = baz))| 
// +--------------------------------+ 
// |       false| 
// |       true| 
// +--------------------------------+