2017-01-16 80 views
1

我试图设置Spark Streaming来读取MQTT源,但它在我收到第二条消息时启动异常。MQTT结构化流式传输

我有以下代码:

import java.sql.Timestamp 

import org.apache.bahir.sql.streaming.mqtt._ 
import org.apache.commons.io.FileUtils 
import org.apache.spark.sql.{ForeachWriter, Row, SparkSession} 
import org.apache.spark.sql.types.StructType 

object App { 

    def main(args: Array[String]): Unit = { 

    val spark = SparkSession 
     .builder 
     .appName("StructuredNetworkWordCount") 
     .master("local[*]") 
     .getOrCreate() 

    import spark.implicits._ 

    // Read text from socket 
    val lines = spark.readStream.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider").option("topic", "measurement").option("username", "spark").option("password", "******").load("tcp://10.0.0.129:1883").as[(String, Timestamp)] 

    val query = lines.writeStream.format("console").start 

    query.awaitTermination() 

    } 
} 

而且我观察到以下情况例外,当我收到第二个消息:

17/01/16 16:18:33 ERROR StreamExecution: Query query-1 terminated with error 
java.lang.AbstractMethodError: org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.commit(Lorg/apache/spark/sql/execution/streaming/Offset;)V 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(StreamExecution.scala:359) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(StreamExecution.scala:358) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
    at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1.apply$mcV$sp(StreamExecution.scala:358) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1.apply(StreamExecution.scala:345) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1.apply(StreamExecution.scala:345) 
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$reportTimeTaken(StreamExecution.scala:656) 
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch(StreamExecution.scala:345) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcZ$sp(StreamExecution.scala:219) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:213) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:213) 
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$reportTimeTaken(StreamExecution.scala:656) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:212) 
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43) 
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:208) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:142) 
Exception in thread "stream execution thread for query-1" java.lang.AbstractMethodError: org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.commit(Lorg/apache/spark/sql/execution/streaming/Offset;)V 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(StreamExecution.scala:359) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(StreamExecution.scala:358) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
    at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1.apply$mcV$sp(StreamExecution.scala:358) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1.apply(StreamExecution.scala:345) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1.apply(StreamExecution.scala:345) 
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$reportTimeTaken(StreamExecution.scala:656) 
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch(StreamExecution.scala:345) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcZ$sp(StreamExecution.scala:219) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:213) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:213) 
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$reportTimeTaken(StreamExecution.scala:656) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:212) 
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43) 
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:208) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:142) 
org.apache.spark.sql.streaming.StreamingQueryException: Query query-1 terminated with exception: org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.commit(Lorg/apache/spark/sql/execution/streaming/Offset;)V 
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:248) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:142) 
Caused by: java.lang.AbstractMethodError: org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource.commit(Lorg/apache/spark/sql/execution/streaming/Offset;)V 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(StreamExecution.scala:359) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1$$anonfun$apply$mcV$sp$5.apply(StreamExecution.scala:358) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:893) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) 
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
    at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1.apply$mcV$sp(StreamExecution.scala:358) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1.apply(StreamExecution.scala:345) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch$1.apply(StreamExecution.scala:345) 
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$reportTimeTaken(StreamExecution.scala:656) 
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch(StreamExecution.scala:345) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcZ$sp(StreamExecution.scala:219) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:213) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:213) 
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$reportTimeTaken(StreamExecution.scala:656) 
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:212) 
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43) 
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:208) 
    ... 1 more 

有没有人有这个问题?

+0

您正在使用哪个版本的Spark和MQTT? –

+0

@YuvalItzchakov Spark 2.0.2和MQTT 3.1 –

+0

MQTT 3.1是否支持Spark 2.0.2? –

回答

-1

您的spark-MQTT罐子与您的火花的版本相同吗?不同的版本可能会导致这些问题。当使用Cloudera Express的Spark 1.6时,我遇到了类似的问题。将其升级到相同版本后,问题已解决

+0

太棒了!刚刚发现我使用的是Spark 2.1.0,而MQTT jar是为2.0.1版本构建的。 –