所以我试图使用卡夫卡为我的应用程序,它有一个生产者日志记录动作到卡夫卡MQ和消费者读取它的MQ.Since我的应用程序在Go中,我使用Shopify萨拉玛使这成为可能。使用Shopify Sarama的卡夫卡错误处理
现在,我能够读出的MQ和使用
fmt.Printf
Howeveer打印邮件内容,我会很喜欢的错误处理比控制台打印好,我愿意多走一英里。
代码现在消费者连接:
mqCfg := sarama.NewConfig()
master, err := sarama.NewConsumer([]string{brokerConnect}, mqCfg)
if err != nil {
panic(err) // Don't want to panic when error occurs, instead handle it
}
和消息的处理:
go func() {
defer wg.Done()
for message := range consumer.Messages() {
var msgContent Message
_ = json.Unmarshal(message.Value, &msgContent)
fmt.Printf("Reading message of type %s with id : %d\n", msgContent.Type, msgContent.ContentId) //Don't want to print it
}
}()
我的问题(我一般是新来测试卡夫卡和新卡夫卡):
上述程序中哪里可能出现错误,以便我可以处理它们?任何示例代码对于我来说都是非常棒的。我能想到的错误条件是msgContent在JSON中并不包含任何类型的ContentId字段。
在kafka中,当消费者尝试读取当前偏移量时,是否存在某些情况,但由于某种原因无法(即使JSON格式正确)?我的消费者是否有可能在失败的偏移读取之上回溯到x步骤并重新处理偏移量?还是有更好的方法来做到这一点?再次,这些情况会是什么?
我很乐意阅读和尝试的东西。
json.Unmarshal可以导致一个错误,如果你不想扔恐慌......只是不要:) – MIkCode 2015-04-01 21:28:59
哈。谢谢。任何想法,我怎么能做#2? – premunk 2015-04-01 21:41:46