2017-01-05 109 views
2

可以DStreamtype parameter s?类型 - 参数化DS流

如果是,如何?

当我尝试myDStream: DStream[(A, B)](类参数)lazy val qwe = mStream.mapWithState(stateSpec),我得到:

value mapWithState is not a member of org.apache.spark.streaming.dstream.DStream[(A, B)] 
    lazy val qwe = mStream.mapWithState(stateSpec) 

回答

2

星火API的基本子集需要隐ClassTags(见Scala: What is a TypeTag and how do I use it?)和PairDStreamFunctions.mapWithState是没有什么不同。检查class definition

class PairDStreamFunctions[K, V](self: DStream[(K, V)]) 
    (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K]) 

and

def mapWithState[StateType: ClassTag, MappedType: ClassTag](
    spec: StateSpec[K, V, StateType, MappedType] 
): MapWithStateDStream[K, V, StateType, MappedType] = { 
    ... 
} 

如果想创造出运行在一个通用的对流和使用mapWithState你至少应该为KeyTypeValueType类型提供ClassTags功能:

def foo[T : ClassTag, U : ClassTag](
    stream: DStream[(T, U)], f: StateSpec[T, U, Int, Int]) = stream.mapWithState(f) 

如果StateTypeMappedType也是参数化的,您也需要ClassTags

def bar[T : ClassTag, U : ClassTag, V : ClassTag, W : ClassTag](
    stream: DStream[(T, U)], f: StateSpec[T, U, V, W]) = stream.mapWithState(f)