2017-10-21 50 views
0

我正在尝试使用s predfast s3 connector创建一个Kafka接收器连接器。然而,出于某种原因,日志输出报告的是SourceConnectorConfig:为什么SourceConnectorConfig报告接收器连接器?

INFO ConnectorConfig values: 
     connector.class = com.spredfast.kafka.connect.s3.sink.S3SinkConnector 
     key.converter = null 
     name = transactions-s3-sink 
     tasks.max = 1 
     transforms = null 
     value.converter = class org.apache.kafka.connect.storage.StringConverter 
(org.apache.kafka.connect.runtime.ConnectorConfig:180) 
INFO Creating connector transactions-s3-sink of type com.spredfast.kafka.connect.s3.sink.S3SinkConnector (org.apache.kafka.connect.runtime.Worker:178) 
INFO Instantiated connector transactions-s3-sink with version 0.0.1 of type class com.spredfast.kafka.connect.s3.sink.S3SinkConnector (org.apache.kafka.connect.runtime.Worker:181) 
INFO Finished creating connector transactions-s3-sink (org.apache.kafka.connect.runtime.Worker:194) 
INFO SourceConnectorConfig values: 
     connector.class = com.spredfast.kafka.connect.s3.sink.S3SinkConnector 
     key.converter = null 
     name = transactions-s3-sink 
     tasks.max = 1 
     transforms = null 
     value.converter = class org.apache.kafka.connect.storage.StringConverter 
(org.apache.kafka.connect.runtime.SourceConnectorConfig:180) 
INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:824) 
... 
INFO Sink task WorkerSinkTask{id=transactions-s3-sink-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:232) 

为什么一个SinkConnectorConfig报道还进一步在日志输出我可以看到一个WorkerSinkTask被创造?

回答

1

原因是此连接器从Connect的API(请参阅源代码here)扩展Connector抽象类而不是SinkConnector抽象类。

因此,Connect框架无法确定此连接器是源还是接收器,并且当前代码中的逻辑是,如果它不是接收器,则认为它是源。这就是你遇到这种不一致的原因。

解决方案是为连接器扩展适当的抽象类(此处为org.apache.kafka.connect.sink.SinkConnector

相关问题