4

试图编写一个消费卡夫卡消息的Spark Streaming作业。这里是我到目前为止:Spark Streaming中的卡夫卡消费者

1)开始动物园管理员。
2)启动Kafka服务器。
3)发送几条消息到服务器。我可以看到他们,当我运行以下命令:

斌/ kafka-console-consumer.sh --zookeeper本地主机:2181 --topic mytopic --from-开始

4)现在想写一个程序来计算在5分钟内进入的消息数量。

的代码看起来是这样的:

Map<String, Integer> map = new HashMap<String, Integer>(); 
    map.put("mytopic", new Integer(1)); 

    JavaStreamingContext ssc = new JavaStreamingContext(
      sparkUrl, " Spark Streaming", new Duration(60 * 5 * 1000), sparkHome, new String[]{jarFile}); 


    JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, "localhost:2181", "1", map); 

不知道什么样的价值要使用的第三个参数(消费群)。当我运行这个时,我得到“无法连接到zookeeper服务器”。但Zookeeper运行在2181端口;否则步骤#3将无法工作。

好像我没有使用正确KafkaUtils.createStream。有任何想法吗?

+0

zookeeper是否与Spark一样运行?您是否试图通过使用 /current/bin/zkCli.sh连接到Zookeeper来验证Zookeeper已启动并运行? – 2014-11-04 00:59:41

+0

我好蠢!我改变了'localhost'到实际的机器名称并且通过了这个错误。但 - 它还没有工作。任何人都知道卡夫卡下的“消费群体”的“默认”价值是什么?它似乎没有消耗任何消息。 – DilTeam 2014-11-04 06:35:52

+0

我面临同样的问题,我没有收到生产者的任何消息。我正在使用Python生产者。而且我也可以从控制台用户那里获得msg。 numofparitions在我的配置中也是1。 @DilTeam你是如何解决这个问题的? – Knight71 2015-09-15 11:50:37

回答

2

没有默认的消费者组。你可以在那里使用任意的非空字符串。如果你只有一个消费者,它的消费者群体并不重要。如果有两个或更多的消费者,他们可以是同一消费群体的一部分,或属于不同的消费群体。

http://kafka.apache.org/documentation.html

Consumers

...

If all the consumer instances have the same consumer group, then this works just like a traditional queue balancing load over the consumers.

If all the consumer instances have different consumer groups, then this works like publish-subscribe and all messages are broadcast to all consumers.

我觉得问题可能出在 '主题' 参数。 从Spark docs

Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread

你只指定了单个分区你的主题,即 '1'。根据代理的设置(num.partitions),可能会有多个分区,并且您的消息可能会发送到其他分区,而这些分区不会被程序读取。

此外,我相信part​​itionIds是基于0的。所以如果你只有一个分区,它的ID将等于0.

+1

如您所建议的那样,不确定分区ID是否为0。当我使用: map.put(“mytopic”,new Integer(0)); 我得到这个错误: 错误ReceiverTracker:注销接收器的流0:错误启动接收器0 - java.lang.AssertionError:断言失败 – DilTeam 2014-11-05 00:12:07

+0

不应该跟随代码打印一些东西? JavaDStream 状态= tweets.map( 新功能<字符串,字符串>(){ 公共字符串呼叫(字符串状态){ 的System.out.println(状态); 返回状态; } } ); – DilTeam 2014-11-05 00:14:35

-2

我认为,在你的代码中,调用 KafkaUtils.createStream的第二个参数应该是kafka服务器的host:port,不是动物园管理员的主机和端口。检查一次。

编辑: Kafka Utils API Documentation

按照上述文档,它应该是动物园管理员仲裁。所以应该使用Zookeeper主机名和端口。

zkQuorum 动物园管理员仲裁(主机名:端口,主机名:端口,..)。

+0

如果我给主机:端口,则连接失败。它只是动物园管理员的主机和端口。 – Knight71 2015-09-15 11:52:02

+0

它只是动物园管理员的名单,即zookeeperQorum – 2016-03-09 11:21:19

0

我想你应该指定的动物园管理员,而不是本地主机的IP地址。另外,第三个参数是针对消费者组的名称。它可以是任何你喜欢的名字。当你有多个消费者与同一组绑定时,主题分区会相应地分配。您的推文应该是:

JavaPairReceiverInputDStream tweets = KafkaUtils.createStream(ssc, "x.x.x.x", "dummy-group", map); 
0

我正面临同样的问题。这是为我工作的解决方案。

  • 分配给Spark Streaming应用程序的内核数量必须多于接收器的数量。否则系统会收到数据,但无法处理它。所以Spark Streaming需要至少两个内核。所以在我的火花提交中,我应该提到至少两个核心。
  • kafka-clients-version.jar应该包含在spark-submit中相关jar的列表中。
0

如果zookeeper与流应用程序在同一台计算机上运行,​​那么“localhost:2181”将起作用。否则,您必须提及zookeeper正在运行的主机的地址,并确保运行流应用程序的计算机能够与端口2181上的zookeeper主机通信。

+0

不要发表评论为答案。这应该是评论 – ketan 2016-05-25 06:12:51