我试图搭建一个卡夫卡风暴“Hello World”系统。我已经安装并运行了Kafka,当我向卡夫卡制作人发送数据时,我可以通过卡夫卡控制台消费者阅读。KafkaSpout没有收到来自卡夫卡的任何东西
我从“风暴入门”O'Reilly的书中学习了第02章的例子,并将其修改为使用KafkaSpout而不是普通的喷嘴。
当我运行应用程序时,数据已经在kafka中挂起,KafkaSpout的nextTuple没有得到任何消息 - 它进入,试图迭代协调器下的空管理器列表,然后退出。
我的环境是一个相当古老的Cloudera虚拟机,有Storm 0.9和Kafka-Storm-0.9(最新版本),以及Kafka 2.9.2-0.7.0。
我这是怎么定义的SpoutConfig和拓扑结构:
String zookeepers = "localhost:2181";
SpoutConfig spoutConfig = new SpoutConfig(new SpoutConfig.ZkHosts(zookeepers, "/brokers"),
"gtest",
"/kafka", // zookeeper root path for offset storing
"KafkaSpout");
spoutConfig.forceStartOffsetTime(-1);
KafkaSpoutTester kafkaSpout = new KafkaSpoutTester(spoutConfig);
//Topology definition
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word-reader", kafkaSpout, 1);
builder.setBolt("word-normalizer", new WordNormalizer())
.shuffleGrouping("word-reader");
builder.setBolt("word-counter", new WordCounter(),1)
.fieldsGrouping("word-normalizer", new Fields("word"));
//Configuration
Config conf = new Config();
conf.put("wordsFile", args[0]);
conf.setDebug(false);
//Topology run
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
cluster = new LocalCluster();
cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology());
是否有人可以帮助我弄清楚,为什么我没有收到什么?
感谢, G.