Spark 2.1.1(scala api)从s3位置流式传输json文件。Spark Streaming dropDuplicates
我想基于每个记录的json中找到的ID列(“event_id”)来删除任何传入记录。我不在乎保留哪条记录,即使记录的重复只是部分。我使用追加模式,因为数据只是通过spark.sql()方法进行了丰富/过滤,没有通过/窗口聚合进行分组。然后,我使用追加模式将实木复合地板文件写入s3。
根据文档,我应该能够使用不带水印的dropDuplicates进行重复数据删除(显然这在长时间运行的生产中无效)。然而,这种失败,出现错误:
用户类抛出异常:org.apache.spark.sql.AnalysisException:当有在流DataFrames /数据集流聚合不支持追加输出模式
该错误似乎很奇怪因为我没有进行聚合(除非dropDuplicates或sparkSQL算作聚合?)。
我知道重复不会发生在彼此的3天之外,所以我通过添加一个水印(通过紧接在重复之前使用.withWatermark())再次尝试它。但是,似乎要等到写入数据之前的3天。 (即从今天7月24日起,只有截至7月21日同一时间的数据才写入输出)。
由于没有聚合,我希望在处理批处理后立即写入每一行,并简单地丢弃任何具有在前3天内发生的事件标识的行。有没有简单的方法来实现这一点?
感谢