2015-10-16 37 views
2

我运行一个左连接在Spark RDD但有时我得到这样的输出:去除留下的一些加盟RDD火花

(k, (v, Some(w)))

(k, (v, None))

我该怎么做才能让它回到我的身边

(k, (v, (w)))

(k, (v,()))

这里是我如何合并2个文件..

def formatMap3(
    left: String = "", right: String = "")(m: String = "") = { 
    val items = m.map{k => { 
    s"$k"}} 
    s"$left$items$right" 
} 



val combPrdGrp = custPrdGrp3.leftOuterJoin(cmpgnPrdGrp3) 

val combPrdGrp2 = combPrdGrp.groupByKey 

val combPrdGrp3 = combPrdGrp2.map { case (n, list) => 
    val formattedPairs = list.map { case (a, b) => s"$a $b" } 
    s"$n ${formattedPairs.mkString}" 
} 

回答

1

如果你在得到格式化输出,而不Somes/Nones只是有趣,那么这样的事情应该工作:

val combPrdGrp3 = combPrdGrp2.map { case (n, list) => 
    val formattedPairs = list.map { 
    case (a, Some(b)) => s"$a $b" 
    case (a, None) => s"$a,()" 
    } 
    s"$n ${formattedPairs.mkString}" 
} 

如果你心里有其他用途,那么你可能需要提供更多细节。

1

在火花leftOuterJoin()函数返回一个包含连接键,左侧设置的值和一个元组Option正确的价值。要从Option类中提取出来,只需在合成的RDD中调用getOrElse()右边的值即可。举个例子:

scala> val rdd1 = sc.parallelize(Array(("k1", 4), ("k4", 7), ("k8", 10), ("k6", 1), ("k7", 4))) 
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[13] at parallelize at <console>:21 

scala> val rdd2 = sc.parallelize(Array(("k5", 4), ("k4", 3), ("k0", 2), ("k6", 5), ("k1", 6))) 
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[14] at parallelize at <console>:21 

scala> val rdd_join = rdd1.leftOuterJoin(rdd2).map { case (a, (b, c: Option[Int])) => (a, (b, (c.getOrElse()))) } 
rdd_join: org.apache.spark.rdd.RDD[(String, (Int, AnyVal))] = MapPartitionsRDD[18] at map at <console>:25' 

scala> rdd_join.take(5).foreach(println) 
... 
(k4,(7,3)) 
(k6,(1,5)) 
(k7,(4,())) 
(k8,(10,())) 
(k1,(4,6))