1

我使用Kafka-Cassandra-Sink将数据从Kafka推送到Cassandra。 Kafka以JSON格式从logstash获取所有时间数据,并且我想通过连接器将相同的数据推送到Cassandra。Kafka连接Cassandra连接器

我是由本教程领导的:http://docs.datamountaineer.com/en/latest/cassandra-sink.html,我在模式注册表中从avro更改为JSON格式。之后我做了新的实例我有这种哪些,我可以看到一个错误是常见的:

[2017-02-10 09:01:49,978] ERROR Task cassandra-sink-orders-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:390) 
java.lang.RuntimeException: java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct 
     at com.datamountaineer.streamreactor.connect.errors.ThrowErrorPolicy.handle(ErrorPolicy.scala:58) 
     at com.datamountaineer.streamreactor.connect.errors.ErrorHandler$class.handleError(ErrorHandler.scala:67) 
     at com.datamountaineer.streamreactor.connect.errors.ErrorHandler$class.handleTry(ErrorHandler.scala:48) 
     at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter.handleTry(CassandraJsonWriter.scala:40) 
     at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter.insert(CassandraJsonWriter.scala:144) 
     at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter.write(CassandraJsonWriter.scala:104) 
     at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask$$anonfun$put$2.apply(CassandraSinkTask.scala:72) 
     at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask$$anonfun$put$2.apply(CassandraSinkTask.scala:72) 
     at scala.Option.foreach(Option.scala:257) 
     at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask.put(CassandraSinkTask.scala:72) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:370) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:227) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) 
     at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) 
     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) 
     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) 
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
     at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct 
     at com.datamountaineer.streamreactor.connect.schemas.ConverterUtil$class.convert(ConverterUtil.scala:59) 
     at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter.convert(CassandraJsonWriter.scala:40) 
     at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter.com$datamountaineer$streamreactor$connect$cassandra$sink$CassandraJsonWriter$$toJson(CassandraJsonWriter.scala:154) 
     at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter$$anonfun$3$$anonfun$apply$1.apply$mcV$sp(CassandraJsonWriter.scala:127) 
     at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter$$anonfun$3$$anonfun$apply$1.apply(CassandraJsonWriter.scala:125) 
     at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter$$anonfun$3$$anonfun$apply$1.apply(CassandraJsonWriter.scala:125) 
     at com.datamountaineer.streamreactor.connect.concurrent.ExecutorExtension$RunnableWrapper$$anon$1.run(ExecutorExtension.scala:30) 
     ... 3 more 
[2017-02-10 09:01:49,981] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:391) 

我试过其他的方式,通过logstash发送Avro的卡夫卡但我不知道该如何处理与像@timestamp版本属性或。换句话说,我不知道如何创建Avro的模式,这将成功地解析这两个字段,因为我让周围的架构属性unrecognizing @错误。

有人可以给我建议或另一个接收器。它不一定要融合,我只是需要接收器才能从卡夫卡获取这些数据给卡桑德拉。谢谢研究员!

+0

卡夫卡主题的来源是什么?来自Connect还是来自其他地方?是Avro序列化还是JSON?它在Kafka中?你可以张贴你的水槽连接器配置吗。 –

回答

0

,你必须改变路径等一些性能/模式的注册表/ connect-avro-distributed.properties 你有评论这些线

key.converter = io.confluent.connect.avro。 AvroConverter key.converter.schema.registry.url = http://localhost:8081 value.converter = io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url = http://localhost:8081

并添加这些线S:

key.converter = org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable =假 value.converter = org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable = false