2017-10-19 80 views
1

我是新来的火花。我使用结构化流式传输从kafka读取数据。Spark(2.2):deserialise使用结构化流式处理来自卡夫卡的节俭记录

我可以在Scala中使用此代码读取数据:

val data = spark.readStream 
     .format("kafka") 
     .option("kafka.bootstrap.servers", brokers) 
     .option("subscribe", topics) 
     .option("startingOffsets", startingOffsets) 
     .load() 

我在值列数据是节俭的记录。 Streaming API以二进制格式提供数据。我看到将数据转换为字符串或json的示例,但我无法找到任何有关如何将数据反序列化到Thrift的示例。

我该如何做到这一点?

回答

0

我在databricks的网站上发现了这个博客。它展示了如何利用Spark SQL的API来消费和转换来自Apache Kafka的复杂数据流。

https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html

有解释UDF如何被用来解串器行的部分:

object MyDeserializerWrapper { 
    val deser = new MyDeserializer 
} 
spark.udf.register("deserialize", (topic: String, bytes: Array[Byte]) => 
    MyDeserializerWrapper.deser.deserialize(topic, bytes) 
) 

df.selectExpr("""deserialize("topic1", value) AS message""") 

我用java,所以只好写下面的示例UDF,以检查它怎么能在Java中被称为:

UDF1<byte[], String> mode = new UDF1<byte[], String>() { 
      @Override 
      public String call(byte[] bytes) throws Exception { 
       String s = new String(bytes); 
       return "_" + s; 
      } 
     }; 

现在我可以使用这个UDF在结构化流字数举例如下:

Dataset<String> words = df 
       //converted the DataFrame to a Dataset of String using .as(Encoders.STRING()) 
//    .selectExpr("CAST(value AS STRING)") 
       .select(callUDF("mode", col("value"))) 
       .as(Encoders.STRING()) 
       .flatMap(
         new FlatMapFunction<String, String>() { 
          @Override 
          public Iterator<String> call(String x) { 
           return Arrays.asList(x.split(" ")).iterator(); 
          } 
         }, Encoders.STRING()); 

对我来说,下一步是为节俭反序列化写一个UDF。我会尽快发布。

相关问题