2017-07-25 71 views
0

我试图使用Kafka Connect从Oracle数据库接收数据。 Kafka连接器提供的默认对象是“GenericRecord”类型。这使得它的方式过于具体,导致通过执行record.getAsString(“someIDENTIFIER”)获取数据的情况。是否有可能获取特定类型的对象而不是GenericRecord类型。Kafka Connect集成

+0

您正在使用哪个Connect插件? JDBC? DBVisit?加洲的金门大桥? –

+0

我正在使用融合JDBC。 “connector.class”:“io.confluent.connect.jdbc.JdbcSourceConnector” –

回答

2

卡夫卡连接信号源连接器与SourceRecord对象一起工作,并且卡夫卡连接工人被配置为使用一个变换器序列化的SourceRecord为二进制形式,其随后写入到卡夫卡主题。 Kafka Connect附带JSON转换器,Confluent提供Avro转换器。所以,写给Kafka的消息的二进制形式取决于你使用的是哪个转换器。

(同样地,水槽连接器与SinkRecord对象一起工作,并且卡夫卡连接工人使用其转换器反序列化从卡夫卡读取消息的二进制形式为SinkRecord对象与所述连接器的交易。)

听起来就像你正在写一个卡夫卡消费者,并在那里看到GenericRecord对象。如果是这样,那么你可能已经配置了卡夫卡连接工作者使用汇合的Avro的转换器,这对于信号源连接器像JDBC连接器SourceRecord转换成Avro的二进制格式卡夫卡连接,然后写入到卡夫卡的话题。您的客户端,然后有可能使用与Avro的解串器配置了卡夫卡的消费者,除非你给解串器的Avro的模式与它的工作将反序列化的Avro编码消息到Avro的GenericRecord

但是,您可以将应用程序配置为了解Avro架构的特定版本,并让构建系统为该版本的Avro架构生成代码,以创建将反序列化Avro编码的特定代码消息转换为由模式描述的内存中形式。在Java中,这意味着您需要从模式生成类,然后在代码中使用生成的类将GenericRecord复制到类的实例中。请参阅this complete consumer example,特别是this line,用于从GenericRecord转换。在这个例子中,LogLine从Avro的模式生成的类:Avro公司的

GenericRecord genericEvent = (GenericRecord) messageAndMetadata.message(); 
LogLine event = (LogLine) SpecificData.get().deepCopy(LogLine.SCHEMA$, genericEvent); 

一个显著的好处是,它直接支持模式演化,并汇合的模式注册采取的这种优势。因此,尽管源连接器可能会演变生成的表的Avro模式,以响应数据库中表的结构更改,但只要数据库模式发生变化以便Avro模式向后兼容,则Avro库您的客户端应用程序使用将自动从消息的Avro模式转换为您的应用程序使用的Avro模式。

当然,在某些时候,你会改变你的应用程序使用新的Avro的模式,但是,这并不必须是在同一时间。事实上,如果你配置模式注册为强制执行模式版本是向前和向后兼容,你可以或前更改客户端应用程序后数据库中更改和JDBC源连接器开始使用的Avro的新版本架构。

+0

谢谢你这么多你的回应。但我正在使用Scala编写消费者。另外,我们使用Avro生成所有模式并生成相应的类。即使在使用scala或者有不同的方式来模拟这个时,这可以实现吗? –

+0

请参阅[此问题](https://stackoverflow.com/questions/31763571/importing-avro-schema-in-scala),了解如何使用[avro4s库](https:/ /github.com/sksamuel/avro4s)来生成你的课程。也可能有其他的图书馆。 –

+0

非常感谢你... –