2016-12-07 89 views

回答

1

您可以检查服务器是否通过使用此运行:

ZkClient zkClient = new ZkClient("your_zookeeper_server", 5000 /* ZOOKEEPER_SESSION_TIMEOUT */, 5000 /* ZOOKEEPER_CONNECTION_TIMEOUT */, ZKStringSerializer$.MODULE$); 
List<Broker> brokers = scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster()); 
if (brokers.isEmpty()) { 
    // No brokers available 
} else { 
    // There are brokers available 
} 
+0

这是一种检查Zookeeper连接的方法,而不是Kafka服务器。 – dbustosp

3

我有同样的问题,我不想离开这个问题没有任何答案。 我读了很多关于如何检查连接的信息,并且我发现的大多数答案都是检查与Zk的连接,但我确实想直接使用Kafka服务器检查连接。

我所做的是创建一个简单的KafkaConsumer并列出所有的话题与listTopics()。如果连接成功,那么你会得到一些回报。否则,你会得到一个TimeoutException

def validateKafkaConnection(kafkaParams : mutable.Map[String, Object]) : Unit = { 
    val props = new Properties() 
    props.put("bootstrap.servers", kafkaParams.get("bootstrap.servers").get.toString) 
    props.put("group.id", kafkaParams.get("group.id").get.toString) 
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") 
    val simpleConsumer = new KafkaConsumer[String, String](props) 
    simpleConsumer.listTopics() 
    } 

然后你可以将这个方法包装在try-catch句子中来捕捉异常。