0
我想使用spark结构化流式api读取s3中的avro文件。你可以找到有关使用kafka的信息,但是我找不到s3的任何信息。这里的问题是我不知道要设置什么格式。这里是我的简单代码:使用火花结构化流式读取s3中的avro文件
Dataset<Row> baseDataSet = sparkSession
.readStream()
.format("?") //What this format should be?
.schema(new StructType()
.add("value", "binary"))
.load("s3://path/to/streaming/key")
.select(col("value"))
.map(value -> {//do avro deserialization},Encoders.kryo(//deserialization class))
.writeStream()
.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
.format("console")
.outputMode("update")
.start();
我知道avro仍然没有在结构化流式API中实现。但是,为了读取二进制数据,我应该使用什么格式,然后以任何我想要的方式(映射函数)反序列化它。