2016-03-01 98 views
2

我得到下面的火花流应用程序错误,我正在使用kafka输入流。当我使用套接字时,它工作正常。但是,当我改变为卡夫卡它给错误。任何人都知道为什么它会抛出错误,我是否需要更改批处理时间并检查指向时间?Spark Streaming,kafka:java.lang.StackOverflowError

错误的StreamingContext:错误启动的背景下,将其标记为停止 java.lang.StackOverflowError的

我的程序:

def main(args: Array[String]): Unit = { 

    // Function to create and setup a new StreamingContext 
    def functionToCreateContext(): StreamingContext = { 
     val conf = new SparkConf().setAppName("HBaseStream") 
     val sc = new SparkContext(conf) 
     // create a StreamingContext, the main entry point for all streaming functionality 
     val ssc = new StreamingContext(sc, Seconds(5)) 
     val brokers = args(0) 
     val topics= args(1) 
     val topicsSet = topics.split(",").toSet 
     val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 
     val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
     ssc, kafkaParams, topicsSet) 

     val inputStream = messages.map(_._2) 
// val inputStream = ssc.socketTextStream(args(0), args(1).toInt) 
     ssc.checkpoint(checkpointDirectory) 
     inputStream.print(1) 
     val parsedStream = inputStream 
     .map(line => { 
      val splitLines = line.split(",") 
      (splitLines(1), splitLines.slice(2, splitLines.length).map((_.trim.toLong))) 
     }) 
     import breeze.linalg.{DenseVector => BDV} 
     import scala.util.Try 

     val state: DStream[(String, Array[Long])] = parsedStream.updateStateByKey(
     (current: Seq[Array[Long]], prev: Option[Array[Long]]) => { 
      prev.map(_ +: current).orElse(Some(current)) 
      .flatMap(as => Try(as.map(BDV(_)).reduce(_ + _).toArray).toOption) 
     }) 

     state.checkpoint(Duration(10000)) 
     state.foreachRDD(rdd => rdd.foreach(Blaher.blah)) 
     ssc 
    } 
    // Get StreamingContext from checkpoint data or create a new one 
    val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _) 
    } 
} 

回答

0

尝试删除检查点目录

我不确定,但似乎您的流式上下文无法从检查点恢复。无论如何,它为我工作。