2017-06-26 34 views
0

我正在使用Confluent.Kafka dotnet客户端。卡夫卡消费者提交线程安全

namespace Confluent.Kafka 
{ 
    public class Consumer<TKey, TValue> : IDisposable 
    { 
     public Task<CommittedOffsets> CommitAsync(); 
    } 
} 

正如您所见,Consumer.CommitAsync是一种异步方法。在不等待回复的情况下拨打CommitAsync方法是否安全,然后再拨打Subscribe

下面的示例代码。

using (var consumer = new Confluent.Kafka.Consumer<MessageKey, byte[]>(config, new MessageKeyDeserializer(), new ByteArrayDeserializer())) 
{ 
       consumer.Subscribe(topics); 

       while (true) 
       { 
        Message<MessageKey, byte[]> msg; 
        if (consumer.Consume(out msg, TimeSpan.FromSeconds(1))) 
        { 
         // ... 

         if(msg.Offset % 100 == 0) 
         { 
          consumer.CommitAsync().ContinueWith((t) => 
          { 
           // log t.Exception 
          }, TaskContinuationOptions.OnlyOnFaulted); 
         } 
        } 
       } 
} 

回答

0

我想你想说的话下次调用消耗

是的,它是安全的,这个没有问题。 我还会添加一些提交时间窗口(例如5秒和100msgs之间的第一个例子),这样如果您在一段时间内没有收到消息,仍然会提交它们