2017-02-11 168 views
1

我是相当新的风暴,最近改变了我的螺栓从IRichBolt 而不是BaseBasicBolt继承,这意味着我现在负责ACKING的和失败 一个元组根据我自己的逻辑。风暴拓扑:正确的方法ACK当两个螺栓有相同的源螺栓

My topology looks like this: 螺栓A向螺栓B和C发出相同的元组,每个坚持数据到卡桑德拉。 这些操作不是幂等的,并且包含对两个不同计数器列族的更新。 我只对失败的元组感兴趣,并在Cassandra的某些异常(不是读/写超时,只有QueryConsistency或Validation异常)中重播它。 问题是,如果螺栓B发生故障,相同的元组将从喷口重播并再次发射到螺栓C,螺栓C已经成功地保留其数据,从而创建错误的数据。

我试着了解acking是如何完成的(来自阅读:http://www.slideshare.net/andreaiacono/storm-44638254),但未能理解 在上述情况中发生了什么。

我想要正确解决这个问题的唯一方法是用相同的输入源创建另一个喷口:喷口1 - >螺栓A - >螺栓B,喷口1' - >螺栓A' - >螺栓C'或者将两个列族的数据保存在Bolts B和C中完成的同一个批处理语句中,方法是将它们合并为一个。

这是正确的还是我错过了什么?还有另一种可能的解决方案来正确地确认这些元组吗?

谢谢。

回答

0

你没有说你想等待多长时间来重试一次螺栓B或C中的失败更新,但不是完全失败螺栓B中的元组,你可以添加更多的流。将螺栓B的scorpion-tail输出流添加回同一螺栓B.如果螺栓B中的更新失败,则将该元组写入scorpion-tail输出流,以便它再次作为输入返回到螺栓B,从第二流。您可以丰富元组来保存一个时间戳,这样您的新流的螺栓B上的处理逻辑可以查看上次尝试的时间,如果没有足够的时间,可以再次将它写出到蝎尾流。当然,你也可以为螺栓C做同样的事情。

如果你想等很长时间重试元组(长时间处于Storm风格),你可以用Kafka主题替换那些scorpion-tail流以及必要的喷口。

+0

感谢您的回复。喜欢你的想法,但我想我宁愿让喷嘴(在我的情况下是KafkaSpout)用它的ExponentialBackOff重试尝试来处理失败,而不是我必须在我的数据库persister螺栓中实现这些功能。我将简化拓扑树,以便没有两个数据库的螺栓具有相同的“父”螺栓,将上述示例中的螺栓连接到具有批处理语句的一个螺栓中,该批处理语句写入两个列族。 – fncontroloptioncommand