我正在设置一个Kafka Connect分布式模式应用程序,它将成为Kafka to S3管道。我使用Kafka 0.10.1.0-1和Kafka Connect 3.1.1-1。到目前为止,事情进展顺利,但对于我正在处理的较大系统而言,重要的一个方面是需要了解Kafka-> FileSystem管道的偏移信息。根据文档,offset.storage.topic
配置将是分布式模式应用程序用于存储偏移量信息的位置。这是有道理的,因为卡夫卡如何在'新'卡夫卡存储消费者抵消品。但是,在使用FileStreamSinkConnector进行一些测试之后,没有任何内容正在写入我的offset.storage.topic
,这是默认值:connect-offsets
。Kafka Connect offset.storage.topic没有收到消息(即如何访问Kafka Connect偏移量元数据?)
具体说来,我使用Python Kafka生产者将数据推送到主题,并使用Kafka Connect和FileStreamSinkConnect将主题中的数据输出到文件。这工作和行为,我希望连接器的行为。另外,当我停止连接器并启动连接器时,应用程序会记住主题中的状态,并且没有数据重复。但是,当我转到offset.storage.topic
以查看存储了哪些偏移量元数据时,本主题中没有任何内容。
这是我使用命令:
kafka-console-consumer --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092 --topic connect-offsets --from-beginning
我收到这个消息让此命令运行一分钟左右的时间:
Processed a total of 0 messages
所以总结一下,我有2个问题:
- 为什么我即使我的分布式应用程序保持正确的状态,s偏移量元数据是否被写入应该存储的主题?
- 如何访问Kafka Connect分布式模式应用程序的偏移量元数据信息?这对于我们团队的Lambda架构实现我们系统来说是100%必需的。
感谢您的帮助。
检查消费者是否阅读主题并且不会失败。由于消费者期望的不仅仅是消息,而是格式正确的消息(头中有5个字节,其中4个是模式注册表中的模式标识)。检查此:http://stackoverflow.com/a/41285681/1437693 –
感谢您的帮助@YuriTceretian。这涉及Kafka Connect,它有自己的内置消费者。 – PhillipAMann
Python Kafka与Connect的使用者兼容吗? –