2017-07-24 113 views
-1

我想我的手在kafka火花结构化流,但得到一些异常,如异常在线程“主”“org.apache.spark.sql.AnalysisException :无法解析'device'给定的输入列:[value,offset,partition,key,timestamp,timestampType,topic];异常在线程“主”org.apache.spark.sql.AnalysisException:

附上我的代码

import org.apache.spark.sql._ 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.types.StructType 
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.streaming.ProcessingTime 
case class DeviceData(device: String, deviceType: String, signal: String) 

object dataset_kafka { 
    def main(args: Array[String]): Unit = { 
    val spark = SparkSession 
      .builder() 
      .appName("kafka-consumer") 
      .master("local[*]") 
      .getOrCreate() 
     import spark.implicits._ 

     spark.sparkContext.setLogLevel("WARN") 


    val df = spark 
     .readStream 
     .format("kafka") 
     .option("kafka.bootstrap.servers", "172.21.0.187:9093") 
     .option("subscribe", "test") 
     .option("startingOffsets", "earliest") 
     .load() 
     println(df.isStreaming) 
     println(df.printSchema()) 

    val ds: Dataset[DeviceData] = df.as[DeviceData] 

    val values = df.select("device").where("signal == Strong") 

    values.writeStream 
      .outputMode("append") 
      .format("console") 
      .start() 
      .awaitTermination() 


    } 
} 

任何帮助如何解决这个问题?

回答

0

卡夫卡流总是产生以下字段:valueoffsetpartitionkeytimestamptimestampTypetopic。在你的情况下,你对value感兴趣,但请注意values are always deserialized as byte arrays,因此,在反序列化JSON之前需要强制转换为字符串。

试试下面的代码:

import org.apache.spark.sql.functions.from_json 
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder 
import spark.implicits._ 

val kafkaStream = 
    spark.readStream 
    .format("kafka") 
    .option("kafka.bootstrap.servers", "172.21.0.187:9093") 
    .option("subscribe", "test") 
    .option("startingOffsets", "earliest") 
    .load() 

// If you don't want to build the schema manually 
val schema = ExpressionEncoder[DeviceData]().schema 

val ds = kafkaStream.select(from_json($"value".cast("string"), schema)).as[DeviceData] 

val values = ds.filter(_.signal == "Strong").map(_.device) 
相关问题