我试图从我的Kafka 0.8.1群集中检索数据。我已经创建了一个ZookeeperConsumerConnector
的实例,然后尝试对其调用createMessageStreams
。然而,无论我做什么,似乎createMessageStreams
只是挂起,并永远不会返回,即使这是我与卡夫卡做的唯一的事情。Kafka的ZookeeperConsumerConnector.createMessageStreams永不返回
阅读邮件列表看起来这有时可能会发生的原因有几个,但据我所知我没有做过任何这些事情。
另外,我会指出,我使用CLJ-卡夫卡其实这样用Clojure,但我怀疑CLJ-卡夫卡不是问题,因为我有这个问题,即使我运行此代码:
(.createMessageStreams
(clj-kafka.consumer.zk/consumer {"zookeeper.connect" "127.0.0.1:2181"
"group.id" "my.consumer"
"auto.offset.reset" "smallest"
"auto.commit.enable" "false"})
{"mytopic" (int 1)})
和clj-kafka.consumer.zk/consumer
只是使用Consumer.createJavaConsumerConnector
创建一个ZookeeperConsumerConnector
没有做任何太花哨。
此外,也有一定的“mytopic”消息,因为在命令行中,我可以运行以下命令,回到一切我已经发送到话题:
% kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic mytopic --from-beginning
因此,这也并不是说话题是空的。
此时感觉难倒。想法?
ETA:“hang”我想我真正的意思是它似乎旋转了一个线程,然后停留在它从不做任何事情。如果我从REPL运行这段代码我可以通过敲击控制-C把它弄出来,然后我得到这个错误:
IllegalMonitorStateException java.util.concurrent.locks.ReentrantLock$Sync.tryRelease (ReentrantLock.java:155)
我还没有解决这个问题,但我暂时切换到RabbitMQ,我似乎有工作,应该抱着我一段时间。尽管如此,仍然需要解决方案,因为我最终需要处理比RabbitMQ支持更多的音量。 –
如果您使用一些随机的新group.id而不是“my.consumer”会发生什么? “my.consumer”的偏移量可能已经被提交给ZK。 – sandris
@sandris我不记得我是否特别试过,但我相当肯定,即使是第一次使用新鲜的ZK和Kafka,我也看到了这个问题,所以我提供的任何东西都应该有group.id是新的。 –