我试图从IntelliJ的想法运行下面的代码,从卡夫卡打印消息到控制台。但它引发以下错误 - 在线程Spark结构化流 -
异常“主要” org.apache.spark.sql.AnalysisException:查询与流媒体源必须与writeStream.start(执行);; kafka
Stacktrace从Dataset.checkpoint开始并向上调整。如果我删除.checkPoint(),那么我得到一些其他错误 - 与权限相关 17/08/02 12:10:52错误StreamMetadata:写入流元数据StreamMetadata(4e612f22-efff-4c9a-a47a-a36eb533e9d6) C:/ Users/rp/AppData/Local/Temp/temporary-2f570b97-ad16-4f00-8356-d43ccb7660db/metadata java.io.IOException:(null)entry in command string:null chmod 0644 C:\ Users \ rp \应用程序数据\本地的\ Temp \临时2f570b97-ad16-4f00-8356-d43ccb7660db \元
def main(args : Array[String]) = {
val spark = SparkSession.builder().appName("SparkStreaming").master("local[*]").getOrCreate()
val canonicalSchema = new StructType()
.add("cid",StringType)
.add("uid",StringType)
.add("sourceSystem",
new StructType().add("id",StringType)
.add("name",StringType))
.add("name", new StructType()
.add("firstname",StringType)
.add("lastname",StringType))
val messages = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("subscribe","c_canonical")
.option("startingOffset","earliest")
.load()
.checkpoint()
.select(from_json(col("value").cast("string"),canonicalSchema))
.writeStream.outputMode("append").format("console").start.awaitTermination
}
谁能请帮助我了解,我做错了吗?
尝试以管理员身份运行IntelliJ。 –
谢谢你的回复,但没有奏效。 –