2014-07-16 52 views
8

这是一个用scala编写的spark spark程序。它每隔1秒计算一次套接字中的字数。结果将是单词计数,例如,从0到1的单词计数,然后是从1到2的单词计数。但是我想知道是否有某种方法可以改变这个程序,以便我们可以累计字数?也就是说,从0到现在为止的字数。Spark Streaming累计字数

val sparkConf = new SparkConf().setAppName("NetworkWordCount") 
val ssc = new StreamingContext(sparkConf, Seconds(1)) 

// Create a socket stream on target ip:port and count the 
// words in input stream of \n delimited text (eg. generated by 'nc') 
// Note that no duplication in storage level only for running locally. 
// Replication necessary in distributed scenario for fault tolerance. 
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) 
val words = lines.flatMap(_.split(" ")) 
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) 
wordCounts.print() 
ssc.start() 
ssc.awaitTermination() 

回答

9

为此,您可以使用StateDStream。有一个example of stateful word count from sparks examples

object StatefulNetworkWordCount { 
    def main(args: Array[String]) { 
    if (args.length < 2) { 
     System.err.println("Usage: StatefulNetworkWordCount <hostname> <port>") 
     System.exit(1) 
    } 

    StreamingExamples.setStreamingLogLevels() 

    val updateFunc = (values: Seq[Int], state: Option[Int]) => { 
     val currentCount = values.foldLeft(0)(_ + _) 

     val previousCount = state.getOrElse(0) 

     Some(currentCount + previousCount) 
    } 

    val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount") 
    // Create the context with a 1 second batch size 
    val ssc = new StreamingContext(sparkConf, Seconds(1)) 
    ssc.checkpoint(".") 

    // Create a NetworkInputDStream on target ip:port and count the 
    // words in input stream of \n delimited test (eg. generated by 'nc') 
    val lines = ssc.socketTextStream(args(0), args(1).toInt) 
    val words = lines.flatMap(_.split(" ")) 
    val wordDstream = words.map(x => (x, 1)) 

    // Update the cumulative count using updateStateByKey 
    // This will give a Dstream made of state (which is the cumulative count of the words) 
    val stateDstream = wordDstream.updateStateByKey[Int](updateFunc) 
    stateDstream.print() 
    ssc.start() 
    ssc.awaitTermination() 
    } 
} 

它的工作方式是,你得到一个Seq[T]每个批次,那么你更新Option[T]这就像一个蓄电池。它的原因是Option是因为在第一批中它将是None并保持这种状态,除非它被更新。在这个例子中,计数是一个整数,如果你正在处理大量的数据,你甚至可能需要一个LongBigInt