2015-11-05 22 views
0

我使用的Spark 1.5.1使用Scala 2.10.5星火1.5.1,斯卡拉2.10.5:如何扩大的RDD [数组[字符串],矢量]

我有一个RDD[Array[String], Vector]每个在RDD的元素:

  • 我想借此在Array[String]每个String,并与Vector结合起来 创建元组(String, Vector),这一步会导致创建几个元组从的每个元素初始RDD

我们的目标是通过建立元组的RDD来结束:RDD[(String, Vector)],这RDD包含在上一步中创建的所有元组。

谢谢

回答

3

考虑一下:

rdd.flatMap { case (arr, vec) => arr.map((s) => (s, vec)) } 

(第一flatMap让你得到一个RDD[(String, Vector)]为输出而不是一个map,它会给你一个RDD[Array[(String, Vector)]]

1

你试过吗?

// rdd: RDD[Array[String], Vector] - initial RDD 
val new_rdd = rdd 
    .flatMap { 
    case (array: Array[String], vec: Vector) => array.map(str => (str, vec)) 
    } 

玩具为例(我在火花shell中运行它):

val rdd = sc.parallelize(Array((Array("foo", "bar"), 100), (Array("one", "two"), 200))) 
val new_rdd = rdd 
    .map { 
    case (array: Array[String], vec: Int) => array.map(str => (str, vec)) 
    } 
    .flatMap(arr => arr) 
new_rdd.collect 
res14: Array[(String, Int)] = Array((foo,100), (bar,100), (one,200), (two,200))