我使用Kafka-Cassandra-Sink将数据从Kafka推送到Cassandra。 Kafka以JSON格式从logstash获取所有时间数据,并且我想通过连接器将相同的数据推送到Cassandra。Kafka连接Cassandra连接器
我是由本教程领导的:http://docs.datamountaineer.com/en/latest/cassandra-sink.html,我在模式注册表中从avro更改为JSON格式。之后我做了新的实例我有这种哪些,我可以看到一个错误是常见的:
[2017-02-10 09:01:49,978] ERROR Task cassandra-sink-orders-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:390)
java.lang.RuntimeException: java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct
at com.datamountaineer.streamreactor.connect.errors.ThrowErrorPolicy.handle(ErrorPolicy.scala:58)
at com.datamountaineer.streamreactor.connect.errors.ErrorHandler$class.handleError(ErrorHandler.scala:67)
at com.datamountaineer.streamreactor.connect.errors.ErrorHandler$class.handleTry(ErrorHandler.scala:48)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter.handleTry(CassandraJsonWriter.scala:40)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter.insert(CassandraJsonWriter.scala:144)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter.write(CassandraJsonWriter.scala:104)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask$$anonfun$put$2.apply(CassandraSinkTask.scala:72)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask$$anonfun$put$2.apply(CassandraSinkTask.scala:72)
at scala.Option.foreach(Option.scala:257)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask.put(CassandraSinkTask.scala:72)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:370)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:227)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: java.util.HashMap cannot be cast to org.apache.kafka.connect.data.Struct
at com.datamountaineer.streamreactor.connect.schemas.ConverterUtil$class.convert(ConverterUtil.scala:59)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter.convert(CassandraJsonWriter.scala:40)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter.com$datamountaineer$streamreactor$connect$cassandra$sink$CassandraJsonWriter$$toJson(CassandraJsonWriter.scala:154)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter$$anonfun$3$$anonfun$apply$1.apply$mcV$sp(CassandraJsonWriter.scala:127)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter$$anonfun$3$$anonfun$apply$1.apply(CassandraJsonWriter.scala:125)
at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraJsonWriter$$anonfun$3$$anonfun$apply$1.apply(CassandraJsonWriter.scala:125)
at com.datamountaineer.streamreactor.connect.concurrent.ExecutorExtension$RunnableWrapper$$anon$1.run(ExecutorExtension.scala:30)
... 3 more
[2017-02-10 09:01:49,981] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:391)
我试过其他的方式,通过logstash发送Avro的卡夫卡但我不知道该如何处理与像@timestamp和版本属性或。换句话说,我不知道如何创建Avro的模式,这将成功地解析这两个字段,因为我让周围的架构属性unrecognizing @错误。
有人可以给我建议或另一个接收器。它不一定要融合,我只是需要接收器才能从卡夫卡获取这些数据给卡桑德拉。谢谢研究员!
卡夫卡主题的来源是什么?来自Connect还是来自其他地方?是Avro序列化还是JSON?它在Kafka中?你可以张贴你的水槽连接器配置吗。 –