2014-04-23 90 views
3

我试图从我的Kafka 0.8.1群集中检索数据。我已经创建了一个ZookeeperConsumerConnector的实例,然后尝试对其调用createMessageStreams。然而,无论我做什么,似乎createMessageStreams只是挂起,并永远不会返回,即使这是我与卡夫卡做的唯一的事情。Kafka的ZookeeperConsumerConnector.createMessageStreams永不返回

阅读邮件列表看起来这有时可能会发生的原因有几个,但据我所知我没有做过任何这些事情。

另外,我会指出,我使用CLJ-卡夫卡其实这样用Clojure,但我怀疑CLJ-卡夫卡不是问题,因为我有这个问题,即使我运行此代码:

(.createMessageStreams 
    (clj-kafka.consumer.zk/consumer {"zookeeper.connect" "127.0.0.1:2181" 
            "group.id" "my.consumer" 
            "auto.offset.reset" "smallest" 
            "auto.commit.enable" "false"}) 
    {"mytopic" (int 1)}) 

clj-kafka.consumer.zk/consumer只是使用Consumer.createJavaConsumerConnector创建一个ZookeeperConsumerConnector没有做任何太花哨。

此外,也有一定的“mytopic”消息,因为在命令行中,我可以运行以下命令,回到一切我已经发送到话题:

% kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic mytopic --from-beginning 

因此,这也并不是说话题是空的。

此时感觉难倒。想法?

ETA:“hang”我想我真正的意思是它似乎旋转了一个线程,然后停留在它从不做任何事情。如果我从REPL运行这段代码我可以通过敲击控制-C把它弄出来,然后我得到这个错误:

IllegalMonitorStateException java.util.concurrent.locks.ReentrantLock$Sync.tryRelease (ReentrantLock.java:155) 
+0

我还没有解决这个问题,但我暂时切换到RabbitMQ,我似乎有工作,应该抱着我一段时间。尽管如此,仍然需要解决方案,因为我最终需要处理比RabbitMQ支持更多的音量。 –

+0

如果您使用一些随机的新group.id而不是“my.consumer”会发生什么? “my.consumer”的偏移量可能已经被提交给ZK。 – sandris

+0

@sandris我不记得我是否特别试过,但我相当肯定,即使是第一次使用新鲜的ZK和Kafka,我也看到了这个问题,所以我提供的任何东西都应该有group.id是新的。 –

回答

1

中断REPL当我遇到相同的异常相同的问题。它挂起的原因是由于consumer.zk命名空间中的lazy-iterate函数。从中读取消息的队列是LinkedBlockingQueue,并且在该队列上的lazy-iterate函数调用.hasNext中调用.hasNext。这会在队列上创建一个读锁,并会阻塞并等待有可用于取下队列的内容。这意味着lazy-iterate函数永远不会实际返回。懒迭代由“消息”函数调用,如果你不这样做

(take 2 (messages "mytopic" some-consumer)) 

则消息功能将永远不会返回并无限期挂起。我认为这是clj-kafka中的一个错误(或设计缺陷)。为了说明这确实是发生了什么,请尝试在消费者配置中设置“consumer.timeout.ms”“0”。它会抛出TimeoutExpection并将控制权返回给REPL。

这进一步造成了'with-resource'宏的问题。宏对消费者,关闭功能和主体进行绑定;它调用body然后关闭fn。如果在体内,你打电话给'消息',身体永远不会返回,因此关机功能永远不会被调用。如果关闭被调用,消息函数WOULD会返回,因为shutdown会在消息队列中发出消息,通知消费者清理其资源和线程以准备GC。这个宏使应用程序进入一个状态,从主循环中退出的唯一方法是杀死应用程序(或调用它的线程)本身。图书馆在准备好生产环境之前肯定有一段路要走。