2017-07-24 134 views
0

Spark 2.1.1(scala api)从s3位置流式传输json文件。Spark Streaming dropDuplicates

我想基于每个记录的json中找到的ID列(“event_id”)来删除任何传入记录。我不在乎保留哪条记录,即使记录的重复只是部分。我使用追加模式,因为数据只是通过spark.sql()方法进行了丰富/过滤,没有通过/窗口聚合进行分组。然后,我使用追加模式将实木复合地板文件写入s3。

根据文档,我应该能够使用不带水印的dropDuplicates进行重复数据删除(显然这在长时间运行的生产中无效)。然而,这种失败,出现错误:

用户类抛出异常:org.apache.spark.sql.AnalysisException:当有在流DataFrames /数据集流聚合不支持追加输出模式

该错误似乎很奇怪因为我没有进行聚合(除非dropDuplicates或sparkSQL算作聚合?)。

我知道重复不会发生在彼此的3天之外,所以我通过添加一个水印(通过紧接在重复之前使用.withWatermark())再次尝试它。但是,似乎要等到写入数据之前的3天。 (即从今天7月24日起,只有截至7月21日同一时间的数据才写入输出)。

由于没有聚合,我希望在处理批处理后立即写入每一行,并简单地丢弃任何具有在前3天内发生的事件标识的行。有没有简单的方法来实现这一点?

感谢

回答

0

解决方案。将apache.spark.sql.execution.streaming.Sink插入到配置单元表中,然后在批处理中删除重复项,并对目标配置单元表中前几天的数据执行左反连接。

1

以我为例,我曾经通过DSTREAM实现在两个方面:

方式一:

  1. 负载tmp_data(包含3个天内的唯一数据,见下文)
  2. 收到batch_data and do leftOuterJoin with tmp_data
  3. 在第二步和输出做filter新的独特的数据
  4. 更新tmp_data新的独特的数据通过第二步的结果,并删除旧数据(3天以上)
  5. 保存tmp_data HDFS上或任何
  6. 重复上面的再次并再次

另一种方式:

  1. 创建MySQL的一个表并设置UNIQUE INDEXEVENT_ID
  2. 收到batch_data,只是保存事项标识+ EVENT_TIME +无论到mysql
  3. MySQL会忽略重复我们使用的是组织的自定义实现自动
相关问题