2013-07-23 29 views
3

我试图搭建一个卡夫卡风暴“Hello World”系统。我已经安装并运行了Kafka,当我向卡夫卡制作人发送数据时,我可以通过卡夫卡控制台消费者阅读。KafkaSpout没有收到来自卡夫卡的任何东西

我从“风暴入门”O'Reilly的书中学习了第02章的例子,并将其修改为使用KafkaSpout而不是普通的喷嘴。

当我运行应用程序时,数据已经在kafka中挂起,KafkaSpout的nextTuple没有得到任何消息 - 它进入,试图迭代协调器下的空管理器列表,然后退出。

我的环境是一个相当古老的Cloudera虚拟机,有Storm 0.9和Kafka-Storm-0.9(最新版本),以及Kafka 2.9.2-0.7.0。

我这是怎么定义的SpoutConfig和拓扑结构:

String zookeepers = "localhost:2181"; 

SpoutConfig spoutConfig = new SpoutConfig(new SpoutConfig.ZkHosts(zookeepers, "/brokers"), 
     "gtest", 
     "/kafka", // zookeeper root path for offset storing 
     "KafkaSpout"); 
spoutConfig.forceStartOffsetTime(-1); 

KafkaSpoutTester kafkaSpout = new KafkaSpoutTester(spoutConfig); 


//Topology definition 
TopologyBuilder builder = new TopologyBuilder(); 
builder.setSpout("word-reader", kafkaSpout, 1); 
builder.setBolt("word-normalizer", new WordNormalizer()) 
    .shuffleGrouping("word-reader"); 
builder.setBolt("word-counter", new WordCounter(),1) 
    .fieldsGrouping("word-normalizer", new Fields("word")); 

//Configuration 
Config conf = new Config(); 
conf.put("wordsFile", args[0]); 
conf.setDebug(false); 
//Topology run 
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); 
cluster = new LocalCluster(); 
cluster.submitTopology("Getting-Started-Toplogie", conf, builder.createTopology()); 

是否有人可以帮助我弄清楚,为什么我没有收到什么?

感谢, G.

回答

0
SpoutConfig spoutConfig = new SpoutConfig(new SpoutConfig.ZkHosts(zookeepers, "/brokers"), 
     "gtest", // name of topic used by producer & consumer 
     "/kafka", // zookeeper root path for offset storing 
     "KafkaSpout"); 

您正在使用 “GTEST” 主题接收数据。确保你是由制作人从这个主题发送数据。

而在该元组这样的

public void execute(Tuple tuple, BasicOutputCollector collector) { 
     System.out.println(tuple); 
    } 

应该在卡夫卡打印挂起的数据螺栓,打印。

4

如果您已经使用了该消息,则不会再读取该消息,除非您的生产者生成新消息。这是因为您的代码中有forceStartOffsetTime呼叫-1

形式storm-contrib文档:

在喷口另一个非常有用的配置是迫使喷口倒回到先前偏移的能力。你做forceStartOffsetTime在喷水口的配置,例如:

spoutConfig.forceStartOffsetTime(-2); 

它会选择最新的偏移周围的时间戳写入启动消费。您可以通过传入-1强制喷口始终从最新的偏移量开始,并且可以通过传入-2来强制它从最早的偏移量开始。

生产者看起来如何?有一个片段是有用的。你可以用-2替换-1,看看你是否收到任何东西,如果你的制作者没问题,那么你应该可以使用。

0

我经历了一场风波和卡夫卡的融合。这些都是快速移动和相对年轻的项目,因此可能很难获得工作示例来启动您的开发。为了帮助其他开发者(并希望其他开发者贡献我可以使用的有用示例),我开始了一个github项目,以存放与Storm/Kafka(和Esper)开发相关的代码片段。

欢迎您来看看这里> https://github.com/buildlackey/cep

(点击的示例程序,应该让你和运行风暴+卡夫卡目录)。

1
SpoutConfig spoutConf = new SpoutConfig(...) 
spoutConf.startOffsetTime = kafka.api.OffsetRequest.LatestTime(); 
相关问题