0

我想使用spark结构化流式api读取s3中的avro文件。你可以找到有关使用kafka的信息,但是我找不到s3的任何信息。这里的问题是我不知道要设置什么格式。这里是我的简单代码:使用火花结构化流式读取s3中的avro文件

Dataset<Row> baseDataSet = sparkSession    
    .readStream()        
    .format("?") //What this format should be?        
    .schema(new StructType()     
      .add("value", "binary"))   
    .load("s3://path/to/streaming/key")  
    .select(col("value")) 
    .map(value -> {//do avro deserialization},Encoders.kryo(//deserialization class))          
    .writeStream() 
    .trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) 
    .format("console") 
    .outputMode("update") 
    .start(); 

我知道avro仍然没有在结构化流式API中实现。但是,为了读取二进制数据,我应该使用什么格式,然后以任何我想要的方式(映射函数)反序列化它。

回答

0

有一个third-party package for avro。您可以下载该jar并使用spark通过指定format("com.databricks.spark.avro")直接加载avro文件。

目前没有办法读取结构化流中的整个文件以便稍后应用反序列化。

但是,如果您仍然想要自定义解串器,您可以通过实施trait DataSourceRegister来开发自定义数据源。例如,您可能需要检查spark-avro package

在情况下,如果你需要输入数据转换成字节数组,你可以使用这样的:通过线虽然

session 
    .readStream() 
    .textFile("path-to-folder") 
    .as(Encoders.BINARY()) 
    .map(bytesToStringMapper, Encoders.STRING()) 
    .writeStream() 
    .outputMode(OutputMode.Append()) 
    .format("text") 
    .option("path", "path-to-folder") 
    .option("checkpointLocation", "path-to-folder") 
    .queryName("test-query") 
    .start(); 

目前的方式将文件加载文本行。这意味着bytesToStringMapper接收单行作为字节数组并将其转换为字符串。