2015-10-12 14 views
3

在火花流应用程序中维护应用程序状态的最佳方法是什么?如何在Spark Streaming中构建查找映射?

我知道的方法有两种:

  1. 使用“联盟”操作追加到查找RDD和各工会后能持续它。
  2. 将状态保存在文件或数据库中,并将其加载到每批的开始位置。

我的问题是从性能的角度来看哪个更好?另外,有没有更好的方法来做到这一点?

+1

你看看在updateStateByKey?在http://spark.apache.org/docs/latest/streaming-programming-guide.html查找它并尝试一下这个例子,看看它是否适合你的需求 – ccheneson

+0

是的,我看了一下,但无法弄清楚我怎样才能在我的情况下使用它的状态是一个key和value对的映射,其中value是一个用户对象。现在,我想要更新缓存中的用户对象的每一个用户活动流。 – Soumitra

回答

3

你真的应该使用mapWithState(spec: StateSpec[K, V, StateType, MappedType])如下:

import org.apache.spark.streaming.{ StreamingContext, Seconds } 
val ssc = new StreamingContext(sc, batchDuration = Seconds(5)) 

// checkpointing is mandatory 
ssc.checkpoint("_checkpoints") 

val rdd = sc.parallelize(0 to 9).map(n => (n, n % 2 toString)) 
import org.apache.spark.streaming.dstream.ConstantInputDStream 
val sessions = new ConstantInputDStream(ssc, rdd) 

import org.apache.spark.streaming.{State, StateSpec, Time} 
val updateState = (batchTime: Time, key: Int, value: Option[String], state: State[Int]) => { 
    println(s">>> batchTime = $batchTime") 
    println(s">>> key  = $key") 
    println(s">>> value  = $value") 
    println(s">>> state  = $state") 
    val sum = value.getOrElse("").size + state.getOption.getOrElse(0) 
    state.update(sum) 
    Some((key, value, sum)) // mapped value 
} 
val spec = StateSpec.function(updateState) 
val mappedStatefulStream = sessions.mapWithState(spec) 

mappedStatefulStream.print() 
相关问题