2017-08-16 75 views
1

我有一个kafka连接插件,部署在一个kafka集群中(独立模式下,仅用于测试,其想法是分发)。此kafka连接插件使用管理员连接到群集的zookeper,并从中获取一些信息以决定如何处理这些消息。Kafka Connect + Zookepeer没有连接

的代码是这样的:

protected CuratorFramework getCurator(final String zkConnection) { 
    final CuratorFramework curator = CuratorFrameworkFactory.newClient(zkConnection, 
      new ExponentialBackoffRetry(1000, 3)); 
    curator.start(); 
    return curator; 
} 

treeCache = new TreeCache(curator, settings.getConfigurationRoot()); 
... 
treeCache.start() 

它超时在于TreeCache的开始,配置根路径存在于本地动物园管理员(已确认在动物园管理员壳做一个LS,以及用于zkConnection串I”已经试过用:

  • 本地主机:2181(动物园管理员在该端口上运行)
  • 本地主机:2181,本地主机:2182,本地主机:2183
  • :2181
  • :2181,:2182,:2183
  • 127.0.0.1:2181 ...等等

在云中运行卡夫卡流模块使用相同的一段代码,动物园管理员连接...有什么想法这里发生了什么?

谢谢!

回答

0

发现错误!我在try-with-resources声明中调用了getCurator,并且在启动treeCache之前让策展人员关闭,一旦我将策展人移出try-with-resources语句并在SinkTask的stop方法中关闭它,一切正常。

干杯!