2017-03-11 57 views
1

我试图使用scalasparkList和另一个RDD生成RDD。这个想法是获取一个值列表,并生成一个索引,其中包含包含每个值的原始数据集的所有条目。 下面是我试图如何使用scala和spark将列表转换为RDD

def mcveInvertIndex(foos: List[String], bars: RDD[Int]): RDD[(String, Iterable[Int])] = { 
    // filter function 
    def hasVal(foo: String)(bar: Int): Boolean = 
     foo.toInt == bar 
    // call to sc.parallelize to ensure that an RDD is returned 
    sc parallelize(
     foos map (_ match { 
     case (s: String) => (s, bars filter hasVal(s)) 
     }) 
    ) 
    } 

不幸的是这不sbt

> compile 
[info] Compiling 1 Scala source to $TARGETDIR/target/scala-2.11/classes... 
[error] $TARGETDIR/src/main/scala/wikipedia/WikipediaRanking.scala:56: type mismatch; 
[error] found : List[(String, org.apache.spark.rdd.RDD[Int])] 
[error] required: Seq[(String, Iterable[Int])] 
[error] Error occurred in an application involving default arguments. 
[error]  foos map (_ match { 
[error]   ^
[error] one error found 
[error] (compile:compileIncremental) Compilation failed 
[error] Total time: 1 s, completed Mar 11, 2017 7:11:31 PM 

我真的不明白,我得到了错误的编译代码。 ListSeq的一个子类,我假定RDDIterable的子类。有什么明显的我错过了吗?

回答

3

这里是我的解决方案与-理解(应使用比笛卡尔乘积更少的内存)

def mcveInvertIndex(foos: List[String], 
         bars: RDD[Int]): RDD[(String, Iterable[Int])] = 
    { 

    // filter function 
    def hasVal(foo: String, bar: Int): Boolean = 
     foo.toInt == bar 

    // Producing RDD[(String, Iterable[Int])] 
    (for { 
     bar <- bars // it's important to have RDD 
        // at first position of for-comprehesion 
        // to produce the correct result type 
     foo <- foos 
     if hasVal(foo, bar) 
    } yield (foo, bar)).groupByKey() 
    } 
+1

应当指出的是,你的解决方案关闭了一个局部变量。所以网络开销会更高。谨防分布式工作中关闭的危险。 –

+0

@JustinPihony,谢谢你的注意,所以应该是'aggregateByKey(Iterable.empty [Int])((is,i)=> i :: is.toList,_ ++ _)'而不是'groupByKey'来避免网络开销? –

+0

它比groupByKey更少,更多的是在RDD中使用foos –

1

正如在评论中提到的,RDD不是一个可迭代的,所以你必须以某种方式将这两者合并,然后将它们聚合。这是我快速的解决方案,虽然有可能是一个更有效的方式:

def mcveInvertIndex(foos: List[String], bars: RDD[Int]): RDD[(String, Iterable[Int])] = { 
    sc.makeRDD(foos) 
     .cartesian(bars) 
     .keyBy(x=>x._1) 
     .aggregateByKey(Iterable.empty[Int])(
     (agg: Iterable[Int], currVal: (String, Int))=>{ 
      if(currVal._1.toInt != currVal._2) agg 
      else currVal._2 +: agg.toList 
     }, 
     _ ++ _ 
    ) 
    } 
相关问题