2017-07-17 88 views
0

我在下面的格式有星火RDD星火CombineByKey

样品RDD:

Array[(String, (String, Double))] = Array((2014-01-12 00:00:00.0,("XXX",829.95)), (2013-08-28 00:00:00.0,("YYY",469.95000000000005)), (2013-11-01 00:00:00.0,("ZZZ",129.99)), (2013-07-25 00:00:00.0,("XYZ",879.8599999999999)), (2013-10-19 00:00:00.0,("POI",989.94))) 

我试图用combineByKey总结双值从RDD给定键和下方尝试命令

rdd.combineByKey((x:String,y:Double) => (x,y) ,(acc:(String,Double),valu:(String,Double)) => acc._2+valu._2, (acc2:(Double),acc3:(Double)) => (acc2+acc3) 

但提示以下错误:....

:46: error: overloaded method value combineByKey with alternatives: [C](createCombiner: ((String, Double)) => C, mergeValue: (C, (String, Double)) => C, mergeCombiners: (C, C) => C)org.apache.spark.rdd.RDD[(String, C)] [C](createCombiner: ((String, Double)) => C, mergeValue: (C, (String, Double)) => C, mergeCombiners: (C, C) => C, numPartitions: Int)org.apache.spark.rdd.RDD[(String, C)] [C](createCombiner: ((String, Double)) => C, mergeValue: (C, (String, Double)) => C, mergeCombiners: (C, C) => C, partitioner: org.apache.spark.Partitioner, mapSideCombine: Boolean, serializer: org.apache.spark.serializer.Serializer)org.apache.spark.rdd.RDD[(String, C)] cannot be applied to ((String, Double) => (String, Double), ((String, Double), (String, Double)) => Double, (Double, Double) => Double) custMaxOrdr.combineByKey((x:String,y:Double) => (x,y) ,(acc:(String,Double),valu:(String,Double)) => acc._2+valu._2, (acc2:(Double),acc3:(Double)) => (acc2+acc3))

任何帮助表示赞赏。

感谢 Rammy

回答

0

类型的你传递不符合预期的类型的功能。让我们来看看combineByKey签名:

def combineByKey[C](
    createCombiner: V => C, 
    mergeValue: (C, V) => C, 
    mergeCombiners: (C, C) => C): RDD[(K, C)] 

所以,你需要提供:

  • C类型:的预期的综合作用的结果,你的情况我假设它是Double。当然,这种类型可以由编译器推断,如果没有明确提供的话
  • createCombiner: V => C:在我们的例子中,函数的类型为(String, Double) => Double;您正在通过(x:String,y:Double) => (x,y),它有不同的类型;根据您的描述,我认为你只是想这个函数来提取从元组中的Double,所以你需要:(in: (String, Double)) => in._2
  • mergeValue: (C, V) => C:在我们的情况下会((String, Double), Double) => Double,这又不是您所提供它具有的功能键入((String,Double), (String,Double)) => Double
  • mergeCombiners: (C, C) => C这对我们来说将是(Double, Double) => Double - 在这里你匹配功能

总之,这将总结的双重价值每个键:

val result: RDD[(String, Double)] = rdd.combineByKey(
    (in: (String, Double)) => in._2, 
    (acc: Double, valu: (String, Double)) => acc + valu._2, 
    (acc2: Double, acc3: Double) => acc2 + acc3 
) 

所有功能都可以写成省略类型:

val result2: RDD[(String, Double)] = rdd.combineByKey(
    _._2, 
    (acc, valu) => acc + valu._2, 
    (acc2, acc3) => acc2 + acc3 
) 
+0

感谢琐!它有效,,, :-) – Rammy

+0

很高兴帮助:)请upvote /接受答案,让其他用户知道这是解决。 –