1

下面是我在本地的设置:三台虚拟机(使用Virtualbox),kafka和zookeeper安装在这三台服务器上。他们都在互相交谈。如何连接Virtualbox上的kafka群集?

我想从本地使用kafka-console-producer,它需要代理列表。我提供我的虚拟机的IP,但它似乎并没有工作。我也尝试过虚拟机上的advertised.host属性,但对我没有任何影响。这里是我的server.properties从三台机器:

服务器1:

broker.id=4 
port=9092 
host.name=10.30.3.4 
advertised.host.name=10.30.3.4 
advertised.port=9092 
zookeeper.connect=10.30.3.4:2181 
zookeeper.connection.timeout.ms=6000 

服务器2:

broker.id=3 
port=9092 
host.name=10.30.3.3 
advertised.host.name=10.30.3.3 
advertised.port=9092 
zookeeper.connect=10.30.3.3:2181 
zookeeper.connection.timeout.ms=6000 

服务器3:

broker.id=2 
port=9092 
host.name=10.30.3.2 
advertised.host.name=10.30.3.2 
advertised.port=9092 
zookeeper.connect=10.30.3.2:2181 
zookeeper.connection.timeout.ms=6000 

我的VirtualBox也具有端口转发设置: enter image description here 类似地,对于其他两台机器的端口也只是稍微调整了一下。

我能够连接到饲养员就好了,所以:

bin/zkCli.sh -server 127.0.0.1:9999 

是能够连接到ZooKeeper的虚拟机上。但如果我尝试连接卡夫卡控制台制片失败当我尝试发送邮件:

bin/kafka-console-producer.sh --broker-list 127.0.0.1:9502 --topic partition2replica2 --timeout 3000 

导致:

[2016-02-17 15:06:36,943] WARN Property topic is not valid (kafka.utils.VerifiableProperties) 
hi 
there 
[2016-02-17 15:07:23,699] WARN Failed to send producer request with correlation id 3 to broker 3 with data for partitions [partition2replica2,1] (kafka.producer.async.DefaultEventHandler) 
java.nio.channels.ClosedChannelException 
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) 
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) 
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) 
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103) 
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103) 
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103) 
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 
    at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102) 
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) 
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) 
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 
    at kafka.producer.SyncProducer.send(SyncProducer.scala:101) 
    at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255) 
    at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106) 
    at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100) 
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) 
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) 
    at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) 
    at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) 
    at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) 
    at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) 
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) 
    at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100) 
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72) 
    at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105) 
    at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88) 
    at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68) 
    at scala.collection.immutable.Stream.foreach(Stream.scala:547) 
    at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67) 
    at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45) 
[2016-02-17 15:07:25,318] WARN Failed to send producer request with correlation id 7 to broker 3 with data for partitions [partition2replica2,1] (kafka.producer.async.DefaultEventHandler) 
java.nio.channels.ClosedChannelException 
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100) 
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73) 
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72) 
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103) 
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103) 
    at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103) 
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 
    at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102) 
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) 
    at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102) 
    at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 
    at kafka.producer.SyncProducer.send(SyncProducer.scala:101) 
    at kafka.producer.async.DefaultEventHandler.kafka$producer$async 

不知道我在做什么错在这里?有任何想法吗? (如果有人愿意,我可以提供ifconfig输出)。任何帮助将不胜感激。

[编辑1]:添加动物园管理员法定人数的输出:

That seems to be in quorum: 
echo stat| nc 10.30.3.2 2181 
Received: 81 
Sent: 80 
Connections: 1 
Outstanding: 0 
Mode: follower 
Node count: 149 

echo stat| nc 10.30.3.3 2181 
Received: 660 
Sent: 664 
Connections: 1 
Outstanding: 0 
Zxid: 0x600000109 
Mode: leader 
Node count: 149 

echo stat| nc 10.30.3.4 2181 
Received: 293 
Sent: 295 
Connections: 1 
Outstanding: 0 
Zxid: 0x600000109 
Mode: follower 
Node count: 149 

回答

0

至于我能理解你的设置中,每个节点上的饲养员也应该已经在仲裁相互支持3个卡夫卡服务器实例作为一个集群。你只提供了kafka配置,所以我不能确定它们是否以这种方式配置。

您可以通过使用4信命令各饲养员节点上像下面

echo stat | nc <zk ip> <zk port> 
echo mntr | nc <zk ip> <zk port> 

每个人都应该是一个“领头羊”和另外两个应该是“追随者”检查。

我不确定他们是否按预期工作,如果他们没有配置为法定人数。

+0

只是在主要问题中添加了这些命令的输出。我认为这个问题可能与kafka上的广告端口有关,以及如何使用虚拟机访问它。设置简单的端口转发后,我成功地从本地连接到VM上的zookeeper(正如我在原始文章中提到的)。 –

+0

明白了。我不确定问题是由于VirtualBox上的Kafka或端口转发配置造成的;据我所知,客户端将在advertised.host.name和advertised.port下面提供IP和端口,这实际上是10.30.3.X/9092。现在,当您尝试从主机连接时,这些不同。可能需要对端口转发规则进行一些更正,并在广告*选项中进行适当的更改。你可以尝试在广告中使用HOSTIP/PORT吗?主机和advertise.port和测试? – Mehul

+0

因此,我试着在转发规则中添加10.30.3。*或10.0.2.15,并尝试使用127.0.0.9502,它赢得了工作。对不起,经纪人列表中的127.0.0.1:5656,我创建了一个隧道,从这里到当时我在端口转发中使用的任何地方 - 这也不起作用:( –