2016-10-03 51 views
1

我想知道在Flink中是否有内置的错误处理选项。 可能有两种情况:apache flink - 错误处理的正确方法

  1. 从卡夫卡当前消息(对我来说)是无效的,继续下一个

  2. 未捕获的异常 - 从我看到它可以完全停止流聚集。

ho我可以处理这两种情况吗? (Java代码)

回答

3

1)这是一个flatMap惯用做:如果你的消息是有效的,你去与包含您的有效元素(或许已经在同一工序处理)的列表。如果它无效,则只需返回一个空列表,以便该步骤不生成任何元素。我可以提供Scala代码,但我不熟悉Java API,所以我不想让你偏离轨道。只需检查flatMap呼叫。

2)这取决于异常的类型:如果它是由你自己的代码挑起的,正好赶上它,处理它的运营商里面,或者简单地记录它,继续前进。没有任何有关特定案例的进一步信息,这是我所知道的最好的,但再次,来自Scala我没有遇到运行时异常。

+0

谢谢!约.1。我的收集功能获得了一个Tupple。我可以用空来调用它吗?它会起作用吗?或者最好不要叫'collect'? –

+0

对,对不起。我的回答集中在Scala API上,但很明显与Java API中的收集器有点不同。你可以避免调用'collect',是的。 – Chobeat