2017-01-05 32 views
1

我正在设置一个Kafka Connect分布式模式应用程序,它将成为Kafka to S3管道。我使用Kafka 0.10.1.0-1和Kafka Connect 3.1.1-1。到目前为止,事情进展顺利,但对于我正在处理的较大系统而言,重要的一个方面是需要了解Kafka-> FileSystem管道的偏移信息。根据文档,offset.storage.topic配置将是分布式模式应用程序用于存储偏移量信息的位置。这是有道理的,因为卡夫卡如何在'新'卡夫卡存储消费者抵消品。但是,在使用FileStreamSinkConnector进行一些测试之后,没有任何内容正在写入我的offset.storage.topic,这是默认值:connect-offsetsKafka Connect offset.storage.topic没有收到消息(即如何访问Kafka Connect偏移量元数据?)

具体说来,我使用Python Kafka生产者将数据推送到主题,并使用Kafka Connect和FileStreamSinkConnect将主题中的数据输出到文件。这工作和行为,我希望连接器的行为。另外,当我停止连接器并启动连接器时,应用程序会记住主题中的状态,并且没有数据重复。但是,当我转到offset.storage.topic以查看存储了哪些偏移量元数据时,本主题中没有任何内容。

这是我使用命令:

kafka-console-consumer --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic connect-offsets --from-beginning

我收到这个消息让此命令运行一分钟左右的时间:

Processed a total of 0 messages

所以总结一下,我有2个问题:

  1. 为什么我即使我的分布式应用程序保持正确的状态,s偏移量元数据是否被写入应该存储的主题?
  2. 如何访问Kafka Connect分布式模式应用程序的偏移量元数据信息?这对于我们团队的Lambda架构实现我们系统来说是100%必需的。

感谢您的帮助。

+0

检查消费者是否阅读主题并且不会失败。由于消费者期望的不仅仅是消息,而是格式正确的消息(头中有5个字节,其中4个是模式注册表中的模式标识)。检查此:http://stackoverflow.com/a/41285681/1437693 –

+0

感谢您的帮助@YuriTceretian。这涉及Kafka Connect,它有自己的内置消费者。 – PhillipAMann

+0

Python Kafka与Connect的使用者兼容吗? –

回答

0

Confluent发布的新S3 Connector可能对您感兴趣。

从您的描述来看,也许它可以显着简化您将记录从Kafka导出到S3存储桶的目标。

1

偏移量可能会提交到卡夫卡默认偏移提交主题即_consumer_offsets

1
  1. 礼居是正确的,连接-偏移用来跟踪信号源连接器偏移(其中有一个生产者而不是消费者) 。信宿连接有消费者和跟踪偏移通常的方式 - __consumer_offsets话题

  2. 看一次提交的偏移量,最好的办法是与消费群的工具:

    斌/卡夫卡的消费群体。SH --group连接弹性登录连接器--bootstrap-服务器localhost:9092 --describe

组名称始终是 “连接 - ” 和连接器名称(在我的情况下,弹性登陆 - 连接器)。这将显示该组提交的最新偏移量,它基本上确认直到该偏移量的所有消息都被写入Elastic。

+0

嗨格温,我们如何使用Java API实现读取消费者偏移量。我有要求在仪表板中显示它?截至目前,我们正在使用上面提到的上述shell命令。 – Renukaradhya

+1

您可以查看我们在实现消费者组命令行时编写的代码并获得主要想法......(这就是为什么开放源代码的原因,对吧?) –

相关问题