下面是我在本地的设置:三台虚拟机(使用Virtualbox),kafka和zookeeper安装在这三台服务器上。他们都在互相交谈。如何连接Virtualbox上的kafka群集?
我想从本地使用kafka-console-producer,它需要代理列表。我提供我的虚拟机的IP,但它似乎并没有工作。我也尝试过虚拟机上的advertised.host属性,但对我没有任何影响。这里是我的server.properties从三台机器:
服务器1:
broker.id=4
port=9092
host.name=10.30.3.4
advertised.host.name=10.30.3.4
advertised.port=9092
zookeeper.connect=10.30.3.4:2181
zookeeper.connection.timeout.ms=6000
服务器2:
broker.id=3
port=9092
host.name=10.30.3.3
advertised.host.name=10.30.3.3
advertised.port=9092
zookeeper.connect=10.30.3.3:2181
zookeeper.connection.timeout.ms=6000
服务器3:
broker.id=2
port=9092
host.name=10.30.3.2
advertised.host.name=10.30.3.2
advertised.port=9092
zookeeper.connect=10.30.3.2:2181
zookeeper.connection.timeout.ms=6000
我的VirtualBox也具有端口转发设置: 类似地,对于其他两台机器的端口也只是稍微调整了一下。
我能够连接到饲养员就好了,所以:
bin/zkCli.sh -server 127.0.0.1:9999
是能够连接到ZooKeeper的虚拟机上。但如果我尝试连接卡夫卡控制台制片失败当我尝试发送邮件:
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9502 --topic partition2replica2 --timeout 3000
导致:
[2016-02-17 15:06:36,943] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
hi
there
[2016-02-17 15:07:23,699] WARN Failed to send producer request with correlation id 3 to broker 3 with data for partitions [partition2replica2,1] (kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
at scala.collection.immutable.Stream.foreach(Stream.scala:547)
at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2016-02-17 15:07:25,318] WARN Failed to send producer request with correlation id 7 to broker 3 with data for partitions [partition2replica2,1] (kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
at kafka.producer.async.DefaultEventHandler.kafka$producer$async
不知道我在做什么错在这里?有任何想法吗? (如果有人愿意,我可以提供ifconfig输出)。任何帮助将不胜感激。
[编辑1]:添加动物园管理员法定人数的输出:
That seems to be in quorum:
echo stat| nc 10.30.3.2 2181
Received: 81
Sent: 80
Connections: 1
Outstanding: 0
Mode: follower
Node count: 149
echo stat| nc 10.30.3.3 2181
Received: 660
Sent: 664
Connections: 1
Outstanding: 0
Zxid: 0x600000109
Mode: leader
Node count: 149
echo stat| nc 10.30.3.4 2181
Received: 293
Sent: 295
Connections: 1
Outstanding: 0
Zxid: 0x600000109
Mode: follower
Node count: 149
只是在主要问题中添加了这些命令的输出。我认为这个问题可能与kafka上的广告端口有关,以及如何使用虚拟机访问它。设置简单的端口转发后,我成功地从本地连接到VM上的zookeeper(正如我在原始文章中提到的)。 –
明白了。我不确定问题是由于VirtualBox上的Kafka或端口转发配置造成的;据我所知,客户端将在advertised.host.name和advertised.port下面提供IP和端口,这实际上是10.30.3.X/9092。现在,当您尝试从主机连接时,这些不同。可能需要对端口转发规则进行一些更正,并在广告*选项中进行适当的更改。你可以尝试在广告中使用HOSTIP/PORT吗?主机和advertise.port和测试? – Mehul
因此,我试着在转发规则中添加10.30.3。*或10.0.2.15,并尝试使用127.0.0.9502,它赢得了工作。对不起,经纪人列表中的127.0.0.1:5656,我创建了一个隧道,从这里到当时我在端口转发中使用的任何地方 - 这也不起作用:( –