2017-08-10 46 views
0

成员这是非常sad.My火花版本是2.1.1,斯卡拉版本是2.11值reduceByKey不是org.apache.spark.rdd.RDD

import org.apache.spark.SparkContext._ 
import com.mufu.wcsa.component.dimension.{DimensionKey, KeyTrait} 
import com.mufu.wcsa.log.LogRecord 
import org.apache.spark.rdd.RDD 

object PV { 

// 
    def stat[C <: LogRecord,K <:DimensionKey](statTrait: KeyTrait[C ,K],logRecords: RDD[C]): RDD[(K,Int)] = { 
    val t = logRecords.map(record =>(statTrait.getKey(record),1)).reduceByKey((x,y) => x + y) 

我得到这个错误

at 1502387780429 
[ERROR] /Users/lemanli/work/project/newcma/wcsa/wcsa_my/wcsavistor/src/main/scala/com/mufu/wcsa/component/stat/PV.scala:25: error: value reduceByKey is not a member of org.apache.spark.rdd.RDD[(K, Int)] 
[ERROR]  val t = logRecords.map(record =>(statTrait.getKey(record),1)).reduceByKey((x,y) => x + y) 

限定有特点

trait KeyTrait[C <: LogRecord,K <: DimensionKey]{ 
    def getKey(c:C):K 
} 

它被编译,谢谢。

def stat[C <: LogRecord,K <:DimensionKey : ClassTag : Ordering](statTrait: KeyTrait[C ,K],logRecords: RDD[C]): RDD[(K,Int)] = { 
    val t = logRecords.map(record =>(statTrait.getKey(record),1)).reduceByKey((x,y) => x + y) 

重点需要重写订购[T]。

object ClientStat extends KeyTrait[DetailLogRecord, ClientStat] { 
     implicit val c 

lientStatSorting = new Ordering[ClientStat] { 
    override def compare(x: ClientStat, y: ClientStat): Int = x.key.compare(y.key) 
    } 

     def getKey(detailLogRecord: DetailLogRecord): ClientStat = new ClientStat(detailLogRecord) 
    } 

回答

3

这来自一般使用一对rdd函数。该reduceByKey方法实际上是PairRDDFunctions类,它具有从RDD的隐式转换的方法:

implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) 
    (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] 

所以它需要一些隐含的类型类。通常在使用简单的混凝土类型时,这些已经在范围之内。但是,你应该能够修改你的方法也需要这些相同的implicits:

def stat[C <: LogRecord,K <:DimensionKey](statTrait: KeyTrait[C ,K],logRecords: RDD[C])(implicit kt: ClassTag[K], ord: Ordering[K]) 

或者用较新的语法:

def stat[C <: LogRecord,K <:DimensionKey : ClassTag : Ordering](statTrait: KeyTrait[C ,K],logRecords: RDD[C]) 
1

reduceByKey是仅在元组的RDDS中定义的方法,即RDD[(K, V)](K,V是只是一个约定的说,第一密钥是被第二值)。

约你想达到什么样的例子不知道,但是可以肯定的,你需要的RDD里面的值转换为两个值的元组。

相关问题