我试图使用scala
和spark
从List
和另一个RDD
生成RDD
。这个想法是获取一个值列表,并生成一个索引,其中包含包含每个值的原始数据集的所有条目。 下面是我试图如何使用scala和spark将列表转换为RDD
def mcveInvertIndex(foos: List[String], bars: RDD[Int]): RDD[(String, Iterable[Int])] = {
// filter function
def hasVal(foo: String)(bar: Int): Boolean =
foo.toInt == bar
// call to sc.parallelize to ensure that an RDD is returned
sc parallelize(
foos map (_ match {
case (s: String) => (s, bars filter hasVal(s))
})
)
}
不幸的是这不sbt
> compile
[info] Compiling 1 Scala source to $TARGETDIR/target/scala-2.11/classes...
[error] $TARGETDIR/src/main/scala/wikipedia/WikipediaRanking.scala:56: type mismatch;
[error] found : List[(String, org.apache.spark.rdd.RDD[Int])]
[error] required: Seq[(String, Iterable[Int])]
[error] Error occurred in an application involving default arguments.
[error] foos map (_ match {
[error] ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 1 s, completed Mar 11, 2017 7:11:31 PM
我真的不明白,我得到了错误的编译代码。 List
是Seq
的一个子类,我假定RDD
是Iterable
的子类。有什么明显的我错过了吗?
应当指出的是,你的解决方案关闭了一个局部变量。所以网络开销会更高。谨防分布式工作中关闭的危险。 –
@JustinPihony,谢谢你的注意,所以应该是'aggregateByKey(Iterable.empty [Int])((is,i)=> i :: is.toList,_ ++ _)'而不是'groupByKey'来避免网络开销? –
它比groupByKey更少,更多的是在RDD中使用foos –