1
我想获取每2分钟后插入外部mysql数据库 的数据行。我想用Spark Streaming来做这件事。Spark Streaming MYsql
但我得到了程序运行后,这个错误对一个time.So它给我的数据是第一次,但在那之后我得到了下面的错误,程序终止
错误我是
16/08/02 11:15:44 INFO JdbcRDD: closed connection
16/08/02 11:15:44 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 620 bytes result sent to driver
16/08/02 11:15:44 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 451 ms on localhost (1/1)
16/08/02 11:15:44 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/08/02 11:15:44 INFO DAGScheduler: ResultStage 0 (foreach at databaseread.scala:33) finished in 0.458 s
16/08/02 11:15:44 INFO DAGScheduler: Job 0 finished: foreach at databaseread.scala:33, took 0.664559 s
16/08/02 11:15:44 ERROR StreamingContext: Error starting the context, marking it as stopped
java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:163)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:543)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:595)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:594)
at org.test.spark.databaseread$.main(databaseread.scala:41)
at org.test.spark.databaseread.main(databaseread.scala)
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:224)
我张贴我的代码在here.Please帮我
package org.test.spark
import org.xml.sax.helpers.NewInstance
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.JdbcRDD
import java.sql.DriverManager
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
object databaseread {
def main(args:Array[String])
{
val url="jdbc:mysql://localhost:3306/dbname"
val uname="root"
val pwd="root"
var i=0
val driver="com.mysql.jdbc.Driver"
val conf=new SparkConf().setAppName("DBget").setMaster("local")
val sc=new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(60))
val RDD=new JdbcRDD(sc,()=>DriverManager.getConnection(url,uname,pwd),
"select * from crimeweathercoords where ?
=?",1,1,1,r=>r.getString("Borough")+","+r.getString("Month"))
ssc.checkpoint(".")
ssc.start()
ssc.awaitTermination()
}
}
谢谢!请试试这个.. !!! –