1

我试图从IntelliJ的想法运行下面的代码,从卡夫卡打印消息到控制台。但它引发以下错误 - 在线程Spark结构化流 -

异常“主要” org.apache.spark.sql.AnalysisException:查询与流媒体源必须与writeStream.start(执行);; kafka

Stacktrace从Dataset.checkpoint开始并向上调整。如果我删除.checkPoint(),那么我得到一些其他错误 - 与权限相关 17/08/02 12:10:52错误StreamMetadata:写入流元数据StreamMetadata(4e612f22-efff-4c9a-a47a-a36eb533e9d6) C:/ Users/rp/AppData/Local/Temp/temporary-2f570b97-ad16-4f00-8356-d43ccb7660db/metadata java.io.IOException:(null)entry in command string:null chmod 0644 C:\ Users \ rp \应用程序数据\本地的\ Temp \临时2f570b97-ad16-4f00-8356-d43ccb7660db \元

def main(args : Array[String]) = { 
val spark = SparkSession.builder().appName("SparkStreaming").master("local[*]").getOrCreate() 
    val canonicalSchema = new StructType() 
          .add("cid",StringType) 
          .add("uid",StringType) 
          .add("sourceSystem", 
           new StructType().add("id",StringType) 
               .add("name",StringType)) 
          .add("name", new StructType() 
             .add("firstname",StringType) 
             .add("lastname",StringType)) 


    val messages = spark 
        .readStream 
        .format("kafka") 
        .option("kafka.bootstrap.servers","localhost:9092") 
        .option("subscribe","c_canonical") 
        .option("startingOffset","earliest") 
        .load() 
        .checkpoint() 
.select(from_json(col("value").cast("string"),canonicalSchema)) 
.writeStream.outputMode("append").format("console").start.awaitTermination 

} 

谁能请帮助我了解,我做错了吗?

+0

尝试以管理员身份运行IntelliJ。 –

+0

谢谢你的回复,但没有奏效。 –

回答

1
  1. 结构化流式传输不支持Dataset.checkpoint()。有一张开放票可以提供更好的消息或者忽略它:https://issues.apache.org/jira/browse/SPARK-20927

  2. IOException可能是因为您没有在Windows上安装cygwin。

+0

尽管结构化流不支持'checkpoint()',它支持'option(“checkpointLocation”,“/ path/to/store”)'。你会说这个叫什么? –

+0

更新了答案。不幸的是,他们使用相同的词,但他们是完全不同的东西。 – zsxwing

+0

有没有我能找到'checkpointLocation'的实际含义以及它有何不同? –

相关问题