背景
我们有一个管道,它从PubSub接收消息开始,每个消息都有一个文件名。这些文件被分解到行级别,解析为JSON对象节点,然后发送到外部解码服务(解码一些编码数据)。对象节点最终转换为表行并写入Big Query。为什么GroupByKey在束流管道中复制元素(在Google Dataflow上运行时)?
看来,直到他们到达解码服务数据流不承认PubSub的消息。解码服务速度很慢,导致许多邮件一次发送时出现积压。这意味着与PubSub消息相关联的行可能需要一些时间才能到达解码服务。结果,PubSub没有收到确认并重新发送消息。我第一次尝试解决这个问题是,使用withAttributeId()将每个PubSub消息的属性添加到Reader中。但是,在测试中,这只能防止重叠到一起的重复。
我的第二次尝试PubSub的后添加fusion breaker(example)读取。这只需执行一个不必要的GroupByKey然后取消组合,该想法是GroupByKey强制数据流确认PubSub消息。
的问题
融合断路器上面所讨论的工作在,它可以防止从PubSub的重新发送的消息,但我发现这GroupByKey比它接收输出多种元素:See image。
要尝试和诊断管道的这个我已删除了部分得到一个简单的流水线仍表现出这种行为。这种行为甚至在
- PubSub被一些虚拟转换所取代,它们发出一个固定的消息列表,每个消息之间有一个小的延迟。
- 写入变换已删除。
- 所有的侧面输入/输出将被删除。
我观察到的行为是:
- 一些号码接收的消息的通过直通GroupByKey。
- 某一点后,消息由GroupByKey“保持”(大概是由于GroupByKey后的积压)。
- 这些消息最终退出GroupByKey(在大小为一组)。
- 经过短暂的延迟(大约3分钟)后,相同的消息再次退出GroupByKey(仍然以大小为1的组)。这可能会发生好几次(我怀疑它与他们花在等待输入GroupByKey上的时间成正比)。
示例作业ID为2017-10-11_03_50_42-6097948956276262224。我没有在其他跑步者身上跑横梁。
融合断路器低于:
@Slf4j
public class FusionBreaker<T> extends PTransform<PCollection<T>, PCollection<T>> {
@Override
public PCollection<T> expand(PCollection<T> input) {
return group(window(input.apply(ParDo.of(new PassthroughLogger<>(PassthroughLogger.Level.Info, "Fusion break in")))))
.apply("Getting iterables after breaking fusion", Values.create())
.apply("Flattening iterables after breaking fusion", Flatten.iterables())
.apply(ParDo.of(new PassthroughLogger<>(PassthroughLogger.Level.Info, "Fusion break out")));
}
private PCollection<T> window(PCollection<T> input) {
return input.apply("Windowing before breaking fusion", Window.<T>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.discardingFiredPanes());
}
private PCollection<KV<Integer, Iterable<T>>> group(PCollection<T> input) {
return input.apply("Keying with random number", ParDo.of(new RandomKeyFn<>()))
.apply("Grouping by key to break fusion", GroupByKey.create());
}
private static class RandomKeyFn<T> extends DoFn<T, KV<Integer, T>> {
private Random random;
@Setup
public void setup() {
random = new Random();
}
@ProcessElement
public void processElement(ProcessContext context) {
context.output(KV.of(random.nextInt(), context.element()));
}
}
}
的PassthroughLoggers只需登录通过传递(我用这些来确认元素确实重复,而不是有与所述计数问题)的元素。
我怀疑这是与Windows /触发器有关,但我的理解是,使用.discardingFiredPanes()时不应重复元素 - 无论窗口设置如何。我也试过FixedWindows没有成功。
你是对的,我测试了这个,发现重复日志不会影响BigQuery写出的内容。在解码服务内存不足并导致包失败之前,我怀疑有第二个GroupByKey。然而,我很惊讶,没有任何记录表明有问题 - 这使得这个问题非常混乱,因为一切似乎运行良好。 重新洗牌不幸被标记为已弃用/仅供内部使用。 谢谢! –
重新洗牌被标记为已弃用,因为我们想为其解决的问题提出更好的解决方案,并且要明确说明它可能会在未来被替换。目前可以使用它 - 事实上,它在Beam内的许多源/汇中都用于此目的。使用Reshuffle而不是触发的好处在于,如果您尝试创建自己的Reshuffle,它将绕过缓冲。 –