2016-03-15 91 views
2

TL; DR定制水槽拦截:截距()方法调用多次对同一事件

当水槽源未能事务推到在流水线中的下一信道,它始终保持事件实例为下一次尝试?

一般情况下,安全有有状态 Flume拦截器,其中事件的处理取决于以前处理的事件?

全部问题描述:

我正在考虑利用有关方式主题分区消费者分布在消费群体在现有的基础水槽日志进行流式重复数据删除阿帕奇卡夫卡提供担保的可能性整合架构。

使用Kafka Source for Flume和自定义路由到Kafka主题分区,我可以确保应该进入同一逻辑“重复数据删除队列”的每个事件将由集群中的单个Flume代理进行处理(只要因为集群内没有代理停止/启动)。我有使用定制的水槽拦截以下设置:

[KafkaSource重复数据删除拦截] - >()MemoryChannel) - > [HDFSSink]

看来,当水槽卡夫卡源转轮是无法将一批事件推送到内存通道,作为该批次的一部分的事件实例再次传递给我的拦截器的方法intercept()方法。在这种情况下,很容易将标签(以Flume事件标头的形式)添加到已处理的事件中,以将实际重复与重新处理的失败批次中的事件区分开来。

但是,我想知道是否有任何明确的保证,说明失败事务中的事件实例是为下一次尝试而保留的,或者是否有可能从实际源再次读取事件(在这种情况下,Kafka )并从零重新构建。在这种情况下,我的拦截器会认为这些事件是重复的并丢弃它们,尽管它们从未被传送到频道。

编辑

这是我的拦截器是如何区分一个已经从非处理事件处理的事件实例:

public Event intercept(Event event) { 
    Map<String,String> headers = event.getHeaders(); 
    // tagHeaderName is the name of the header used to tag events, never null 
    if(!tagHeaderName.isEmpty()) { 
    // Don't look further if event was already processed... 
    if(headers.get(tagHeaderName)!=null) 
     return event; 
    // Mark it as processed otherwise... 
    else 
     headers.put(tagHeaderName, ""); 
    } 
    // Continue processing of event... 
} 

回答

0

我遇到了类似的问题:

当接收器写入失败,Kafka Source仍保存已由拦截器处理的数据。在下一次尝试中,这些数据将发送给拦截器,并一次又一次地处理。通过阅读KafkaSource的代码,我相信这是bug。

我的拦截器会从原始消息中剥离一些信息,并修改原始消息。由于这个错误,重试机制永远不会按预期工作。

到目前为止,The是不容易的解决方案。

+0

事实上,当通道事务失败时,拦截器再次处理相同的Kafka事件本身并不是问题;毕竟,该事件无法交付,因此预计会有重试。我想知道的是,是否有保证JVM中的相同Event **实例**,以及拦截器做出的任何修改将再次使用,或者如果存在可能性(或者在我的情况下,*风险*)该活动将再次从卡夫卡话题中读取。 – Shadocko

+0

请注意,如果您只是要区分已在拦截器的代码中已经处理的事件实例,则可以为事件添加*标记*标头并跳过使用此*标记*标头处理事件。 – Shadocko