我试图使用Kafka Connect从Oracle数据库接收数据。 Kafka连接器提供的默认对象是“GenericRecord”类型。这使得它的方式过于具体,导致通过执行record.getAsString(“someIDENTIFIER”)获取数据的情况。是否有可能获取特定类型的对象而不是GenericRecord类型。Kafka Connect集成
回答
卡夫卡连接信号源连接器与SourceRecord
对象一起工作,并且卡夫卡连接工人被配置为使用一个变换器序列化的SourceRecord
为二进制形式,其随后写入到卡夫卡主题。 Kafka Connect附带JSON转换器,Confluent提供Avro转换器。所以,写给Kafka的消息的二进制形式取决于你使用的是哪个转换器。
(同样地,水槽连接器与SinkRecord
对象一起工作,并且卡夫卡连接工人使用其转换器反序列化从卡夫卡读取消息的二进制形式为SinkRecord
对象与所述连接器的交易。)
听起来就像你正在写一个卡夫卡消费者,并在那里看到GenericRecord
对象。如果是这样,那么你可能已经配置了卡夫卡连接工作者使用汇合的Avro的转换器,这对于信号源连接器像JDBC连接器SourceRecord
转换成Avro的二进制格式卡夫卡连接,然后写入到卡夫卡的话题。您的客户端,然后有可能使用与Avro的解串器配置了卡夫卡的消费者,除非你给解串器的Avro的模式与它的工作将反序列化的Avro编码消息到Avro的GenericRecord
。
但是,您可以将应用程序配置为了解Avro架构的特定版本,并让构建系统为该版本的Avro架构生成代码,以创建将反序列化Avro编码的特定代码消息转换为由模式描述的内存中形式。在Java中,这意味着您需要从模式生成类,然后在代码中使用生成的类将GenericRecord
复制到类的实例中。请参阅this complete consumer example,特别是this line,用于从GenericRecord
转换。在这个例子中,LogLine
从Avro的模式生成的类:Avro公司的
GenericRecord genericEvent = (GenericRecord) messageAndMetadata.message();
LogLine event = (LogLine) SpecificData.get().deepCopy(LogLine.SCHEMA$, genericEvent);
一个显著的好处是,它直接支持模式演化,并汇合的模式注册采取的这种优势。因此,尽管源连接器可能会演变生成的表的Avro模式,以响应数据库中表的结构更改,但只要数据库模式发生变化以便Avro模式向后兼容,则Avro库您的客户端应用程序使用将自动从消息的Avro模式转换为您的应用程序使用的Avro模式。
当然,在某些时候,你会改变你的应用程序使用新的Avro的模式,但是,这并不必须是在同一时间。事实上,如果你配置模式注册为强制执行模式版本是向前和向后兼容,你可以或前更改客户端应用程序后数据库中更改和JDBC源连接器开始使用的Avro的新版本架构。
谢谢你这么多你的回应。但我正在使用Scala编写消费者。另外,我们使用Avro生成所有模式并生成相应的类。即使在使用scala或者有不同的方式来模拟这个时,这可以实现吗? –
请参阅[此问题](https://stackoverflow.com/questions/31763571/importing-avro-schema-in-scala),了解如何使用[avro4s库](https:/ /github.com/sksamuel/avro4s)来生成你的课程。也可能有其他的图书馆。 –
非常感谢你... –
- 1. Kafka Connect Logstash
- 2. Kafka Connect MySQL
- 3. OpenID Connect集成
- 4. Kafka Connect的用例
- 5. Kafka Connect Api - 入门?
- 6. Apache Flink Kafka集成
- 7. MSSQL和KAFKA集成
- 8. Ignite和Kafka集成
- 9. Kafka Connect警报选项?
- 10. Kafka Connect + Zookepeer没有连接
- 11. 将informatica与Kafka集成
- 12. webpack和kafka-node集成
- 13. 如何集成Oracle和Kafka
- 14. angular.js与apache kafka的集成
- 15. Spring集成 - Apache ActiveMQ到Kafka
- 16. Spark流和kafka集成
- 17. 将Apache kafka与Angular.js集成
- 18. spark,kafka集成问题:对象kafka不是org.apache.spark.streaming的成员
- 19. 春季集成kafka如何处理生成kafka时的错误
- 20. Avro Records - > Kafka - > Kafka Connect Sink - > Amazon S3 Storage。幂等?
- 21. Apache-Kafka-Connect,Confluent-HDFS-Connector,未知魔术字节,Kafka-To-Hdfs
- 22. Kafka Connect SourceTask的轮询时间间隔
- 23. 是否有Kafka Connect Python客户端?
- 24. Kafka Connect HDFS接收器问题
- 25. 使用kafka connect的最佳实践
- 26. Kafka Connect - 文件源连接器错误
- 27. Kafka Connect接收器分区:子分区?
- 28. 结构化流+ Kafka集成 - SSL和Kerberos集成?
- 29. Apache Kafka与tomcat和spring的集成
- 30. 为Kafka实现Spring集成InboundChannelAdapter
您正在使用哪个Connect插件? JDBC? DBVisit?加洲的金门大桥? –
我正在使用融合JDBC。 “connector.class”:“io.confluent.connect.jdbc.JdbcSourceConnector” –