试图编写一个消费卡夫卡消息的Spark Streaming作业。这里是我到目前为止:Spark Streaming中的卡夫卡消费者
1)开始动物园管理员。
2)启动Kafka服务器。
3)发送几条消息到服务器。我可以看到他们,当我运行以下命令:
斌/ kafka-console-consumer.sh --zookeeper本地主机:2181 --topic mytopic --from-开始
4)现在想写一个程序来计算在5分钟内进入的消息数量。
的代码看起来是这样的:
Map<String, Integer> map = new HashMap<String, Integer>();
map.put("mytopic", new Integer(1));
JavaStreamingContext ssc = new JavaStreamingContext(
sparkUrl, " Spark Streaming", new Duration(60 * 5 * 1000), sparkHome, new String[]{jarFile});
JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, "localhost:2181", "1", map);
不知道什么样的价值要使用的第三个参数(消费群)。当我运行这个时,我得到“无法连接到zookeeper服务器”。但Zookeeper运行在2181端口;否则步骤#3将无法工作。
好像我没有使用正确KafkaUtils.createStream。有任何想法吗?
zookeeper是否与Spark一样运行?您是否试图通过使用 /current/bin/zkCli.sh连接到Zookeeper来验证Zookeeper已启动并运行? –
2014-11-04 00:59:41
我好蠢!我改变了'localhost'到实际的机器名称并且通过了这个错误。但 - 它还没有工作。任何人都知道卡夫卡下的“消费群体”的“默认”价值是什么?它似乎没有消耗任何消息。 – DilTeam 2014-11-04 06:35:52
我面临同样的问题,我没有收到生产者的任何消息。我正在使用Python生产者。而且我也可以从控制台用户那里获得msg。 numofparitions在我的配置中也是1。 @DilTeam你是如何解决这个问题的? – Knight71 2015-09-15 11:50:37