2017-05-05 103 views
0

我正在从基于JMS的系统迁移到Kafka,Kafka负责同步新旧系统之间的事务。每个由新系统发布的消息都必须由用户/消费者成功处理。我不必担心消息的顺序。由于遗留系统中存在一些设计问题(悲观锁定),偶尔有一些消息到达时很少有事务可能失败,在这种情况下,我希望消息在延迟后回来。我试图找出如何处理与卡夫卡这种情况。如何使用Confluent Kafka处理消费者端的消息处理失败?

我的源和目标应用程序在.NET 4.6.1和C#中。我正在使用Confluent.Kafka v0.9.5客户端库。卡夫卡版本为0.10。

对于消费者应用程序,我已禁用自动提交并显式调用commitAsync方法以在消息成功处理后提交偏移量。以下是我如何创建消费者。

var config = new Dictionary<string, object>() 
       { 
        {"group.id", GroupId}, 
        {"client.id", ClientId}, 
        {"enable.auto.commit", false}, 
        {"bootstrap.servers", _consumerConnectionConfig.BrokerUrl}, 
        { 
         "default.topic.config", new Dictionary<string, object>() 
         { 
          {"auto.offset.reset", "latest"} 
         } 
        } 
       }; 
var consumer = new Consumer<string, string>(config, new StringDeserializer(Encoding.UTF8), new StringDeserializer(Encoding.UTF8)); 

这是如何设置轮询的。

consumer.Subscribe(topics); 

while (!SubscriberCancellationToken.IsCancellationRequested) 
{ 
     consumer.Poll(TimeSpan.FromMilliseconds(1000)); 
} 

这里是在onMessage事件负责处理消息

consumer.OnMessage += (sender, message) => { 
    try 
    { 
    var payload = GetPayload(message); 
    if (_messageHandlerService.ProcessMessage(payload)) 
    {     
     consumer.CommitAsync(message).Wait(SubscriberCancellationToken); 
    } 
    else 
    { 
     //(case 1) Now what should I do????? 
    } 
    } 
    catch(Exception ex) 
    { 
     Log.Error("Unable to process xyz messages", ex); 
     throw; //??? (Case 2) should I throw the exception or should I not? 
    } 
}; 

如果可以成功处理消息,我调用commitAsync和工程就像一个魅力。现在我的问题是我该怎么做,当我无法处理消息(情况1)或发生某种异常(情况2)时。我有什么选择来处理这两种情况?

在JMS世界中,对于(情况1),我应用了延迟再发布策略。基本上,我等待1分钟,然后重新发布消息到同一主题并提交当前消息,以便重新发布的消息将再次返回。出于某种原因,如果我不能重新发布,我只会继续重试,直到我可以或直到重新启动进程。如果在我能够重新发布前重新启动进程,未提交的消息就会回来,循环再次开始。

一旦重新发布成功,如果还有其他消息已经在主题上等待,他们现在将开始移动直到火车绕过一圈,直到一切都被处理完毕。每当我无法处理案例1中的消息时,我会记录一个错误,并根据错误生成相应的警报,以便应用程序支持团队可以采取一些措施,以防万一他们必须修复遗留系统中的某些数据。在此之前,消息会一直失败,然后重新发布。

而对于案例2,我记录了细节并在JMS实现中抛出了一个异常。我现在想知道可能是我应该像案例1一样处理。

现在我的问题是我应该如何处理卡夫卡世界中的这两种情况?有没有更好的选择?

回答

1

几个选项

一是这些消息与它推到另一个卡夫卡的话题,让专门的客户交易,或者,

重试你的消费者中,直到特定的消息被处理,或者你达到一定的阈。这可以通过调用消费者的暂停方法来临时停止消息消费(我正在从Java消费者的角度来谈,请向C#客户端确认),运行您的重试逻辑。请确保您也继续调用轮询(否则消费者将被踢出组,并且其分区将被重新平衡)。完成重新尝试后,请致电恢复方法继续处理 - 现在轮询方法将返回来自卡夫卡的记录

+0

谢谢。让我尝试暂停和恢复功能。我将发布结果与C#客户端 – Vinod

+0

对不起,延迟响应。 Confluent.Kafka c#客户端暂时没有暂停和恢复方法。 – Vinod