2015-04-01 32 views
3

所以我试图使用卡夫卡为我的应用程序,它有一个生产者日志记录动作到卡夫卡MQ和消费者读取它的MQ.Since我的应用程序在Go中,我使用Shopify萨拉玛使这成为可能。使用Shopify Sarama的卡夫卡错误处理

现在,我能够读出的MQ和使用

fmt.Printf 

Howeveer打印邮件内容,我会很喜欢的错误处理比控制台打印好,我愿意多走一英里。

代码现在消费者连接:

mqCfg := sarama.NewConfig() 

master, err := sarama.NewConsumer([]string{brokerConnect}, mqCfg) 
if err != nil { 
    panic(err) // Don't want to panic when error occurs, instead handle it 
} 

和消息的处理:

go func() { 
    defer wg.Done() 
    for message := range consumer.Messages() { 
     var msgContent Message 
     _ = json.Unmarshal(message.Value, &msgContent) 
     fmt.Printf("Reading message of type %s with id : %d\n", msgContent.Type, msgContent.ContentId) //Don't want to print it 
    } 
}() 

我的问题(我一般是新来测试卡夫卡和新卡夫卡):

  1. 上述程序中哪里可能出现错误,以便我可以处理它们?任何示例代码对于我来说都是非常棒的。我能想到的错误条件是msgContent在JSON中并不包含任何类型的ContentId字段。

  2. 在kafka中,当消费者尝试读取当前偏移量时,是否存在某些情况,但由于某种原因无法(即使JSON格式正确)?我的消费者是否有可能在失败的偏移读取之上回溯到x步骤并重新处理偏移量?还是有更好的方法来做到这一点?再次,这些情况会是什么?

我很乐意阅读和尝试的东西。

+0

json.Unmarshal可以导致一个错误,如果你不想扔恐慌......只是不要:) – MIkCode 2015-04-01 21:28:59

+1

哈。谢谢。任何想法,我怎么能做#2? – premunk 2015-04-01 21:41:46

回答

2

关于1)检查我在哪里记录下面的错误消息。这或多或少是我会做的。

关于2)我不知道如何在主题中向后走。通过一次又一次地创建消费者,它的可能性非常大,每次开始的偏移量减去一个。但我不会建议它,因为很可能你最终会一遍又一遍地重播相同的信息。我建议经常保存偏移量,以便在南下时恢复。

下面是我相信解决大部分问题的代码。我没有试过编译这个。而sarama api最近一直在改变,所以这个API目前可能会有所不同。

func StartKafkaReader(wg *sync.WaitGroup, lastgoodoff int64, out chan<- *Message) (error) { 
    wg.Add(1) 
    go func(){ 
     defer wg.Done() 
     //to track the last known good offset we processed, which is 
     // updated after each successfully processed event. 
     saveprogress := func(off int64){ 
      //Save the offset somewhere...a file... 
      //Ive also used kafka to store progress 
      //using a special topic as a WAL 
     } 
     defer saveprogress(lastgoodoffset) 

     client, err := sarama.NewClient("clientId", brokers, sarama.NewClientConfig()) 
     if err != nil { 
      log.Error(err) 
      return 
     } 
     defer client.Close() 
     sarama.NewConsumerConfig() 
     consumerConfig.OffsetMethod = sarama.OffsetMethodManual 
     consumerConfig.OffsetValue = int64(lastgoodoff) 
     consumer, err := sarama.NewConsumer(client, topic, partition, "consumerId", consumerConfig) 
     if err != nil { 
      log.Error(err) 
      return 
     } 
     defer consumer.Close() 
     for { 
      select { 
      case event := <-consumer.Events(): 
       if event.Err != nil { 
        log.Error(event.Err) 
        return 
       } 
       msgContent := &Message{} 
       err = json.Unmarshal(message.Value, msgContent) 
       if err != nil { 
        log.Error(err) 
        continue //continue to skip this message or return to stop without updating the offset. 
       } 
       // Send the message on to be processed. 
       out <- msgContent 

       lastgoodoff = event.Offset 
      } 
     } 
    }() 
}