2017-08-31 73 views
0

我试图整合火花和卡夫卡消费来自卡夫卡的消息。我有生产者代码也发送关于“temp”主题的消息。另外,我正在使用Kafka的Console Producer来制作“temp”主题的消息。Kafka Spark Streaming Consumer将不会收到来自Kafka Console Producer的任何消息?

我已经创建了下面的代码来使用来自同一个“temp”主题的消息,但它也不会收到单个消息。

计划:

import java.util.Arrays; 
import java.util.Map; 
import java.util.HashMap; 
import static org.apache.commons.lang3.StringUtils.SPACE; 

import org.apache.spark.SparkConf; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaPairDStream; 
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.kafka.KafkaUtils; 
import scala.Tuple2; 
import org.apache.log4j.Logger; 
import org.apache.spark.api.java.JavaSparkContext; 
import scala.collection.immutable.ListSet; 
import scala.collection.immutable.Set; 

public class ConsumerDemo { 

    public void main() { 
     String zkGroup = "localhost:2181"; 
     String group = "test"; 
     String[] topics = {"temp"}; 
     int numThreads = 1; 

     SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount").setMaster("local[4]").set("spark.ui.port‌​", "7077").set("spark.executor.memory", "1g"); 
     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); 
     Map<String, Integer> topicMap = new HashMap<>(); 
     for (String topic : topics) { 
      topicMap.put(topic, numThreads); 
     } 
     System.out.println("topics : " + Arrays.toString(topics)); 
     JavaPairReceiverInputDStream<String, String> messages 
       = KafkaUtils.createStream(jssc, zkGroup, group, topicMap); 

     messages.print(); 

     JavaDStream<String> lines = messages.map(Tuple2::_2); 

     //lines.print(); 
     JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator()); 

     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1)) 
       .reduceByKey((i1, i2) -> i1 + i2); 

     //wordCounts.print(); 
     jssc.start(); 
     jssc.awaitTermination(); 
    } 

    public static void main(String[] args) { 
     System.out.println("Started..."); 
     new ConsumerDemo().main(); 
     System.out.println("Ended..."); 
    } 
} 

我加入之后在pom.xml文件的依赖性:

<dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.10</artifactId> 
     <version>0.9.0.0</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka-clients</artifactId> 
     <version>0.11.0.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.11</artifactId> 
     <version>2.2.0</version> 
     <scope>provided</scope> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.10</artifactId> 
     <version>0.9.0-incubating</version> 
     <type>jar</type> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.10</artifactId> 
     <version>1.6.3</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming-kafka_2.10</artifactId> 
     <version>1.6.3</version> 
     <type>jar</type> 
    </dependency> 

    <dependency> 
     <groupId>log4j</groupId> 
     <artifactId>log4j</artifactId> 
     <version>1.2.17</version> 
    </dependency> 

    <dependency> 
     <groupId>org.anarres.lzo</groupId> 
     <artifactId>lzo-core</artifactId> 
     <version>1.0.5</version> 
     <type>jar</type> 
    </dependency> 

    <dependency> 
     <groupId>com.fasterxml.jackson.core</groupId> 
     <artifactId>jackson-databind</artifactId> 
     <version>2.8.2</version> 
    </dependency> 
    <dependency> 
     <groupId>com.fasterxml.jackson.module</groupId> 
     <artifactId>jackson-module-scala_2.10</artifactId> 
     <version>2.8.2</version> 
    </dependency> 
    <dependency> 
     <groupId>com.msiops.footing</groupId> 
     <artifactId>footing-tuple</artifactId> 
     <version>0.2</version> 
    </dependency> 

是我缺少一些依赖或问题是代码?为什么此代码不会收到任何消息?

+0

您是否可以用基于控制台的消费者消费的消息?如果不是,那么生产者可能会有问题。另外,请检查您的端口号是否正确。我不认为POM应该有任何问题,如果有的话,它不应该允许你建立/编译项目。 –

+0

@ NileshPharate-是的,我可以使用卡夫卡的控制台消费者使用消息,因此我们可以说这个问题与卡夫卡或zookeeper无关,即与我用于控制台方法的相同的IP和端口。 – kit

回答

0

你没有调用你有代码来连接和使用来自Kafka的消息的方法。可以在public static void main()中写入该逻辑,或者调用写入该逻辑的方法。

0

当使用Kafka使用者时,特别是当我们在开发环境中进行测试和调试时,制作者可能会不断地向Kafka发送消息。 在这种情况下,我们需要照顾这个卡夫卡消费者参数auto.offset.reset,它确定是否只读取消费者开始运行后写入主题的新消息?或者从主题

这里开始读的是在Kafka documentation给出的官方解释:

auto.offset.reset
怎么办时,没有初始卡夫卡或者偏移当前偏移不存在任何更多的服务器 上(例如,因为该数据已经被删除):

  1. 最早:自动重置偏移最早偏移
  2. 最新:自动复位偏移到最新偏移
  3. 没有:抛出异常给消费者,如果没有以前的偏移发现消费者的群体
  4. 别的:抛出异常给消费者。

有关如何使用kafkaParams如下KafkaDStream创建示例代码片段:

Map<String,String> kafkaParams = new HashMap<>(); 
    kafkaParams.put("zookeeper.connect", "localhost:2181"); 
    kafkaParams.put("group.id", "test02"); //While you are testing the codein develeopment system, change this groupid each time you run the consumer 
    kafkaParams.put("auto.offset.reset", "earliest"); 
    kafkaParams.put("metadata.broker.list", "localhost:9092"); 
    kafkaParams.put("bootstrap.servers", "localhost:9092"); 
    Map<String, Integer> topics = new HashMap<String, Integer>(); 
    topics.put("temp", 1); 
    StorageLevel storageLevel = StorageLevel.MEMORY_AND_DISK_SER(); 

    JavaPairDStream<String, String> messages = 
     KafkaUtils.createStream(jssc, 
       String.class, 
       String.class, 
       StringDecoder.class, 
       StringDecoder.class, 
       kafkaParams, 
       topics, 
       storageLevel)  
     ; 
    messages.print(); 
+0

@ remisharoon-我只以毫秒为单位获取带时间戳的消息。那是什么意思?以下是示例输出 - ------------------------------------------- 时间:1504785338000毫秒 ------------------------------------------- ------------------------------------------- Time:1504785340000 ms - ------------------------------------------ – kit

+0

@kit,意思是说一个“空DStream”。即。它没有阅读卡夫卡的任何记录。在您开始SparkStreming作业 –

+0

@ remisharoon-之后,请尝试写信给Kafka主题。我正在从kafka的控制台制作人发送消息给同一个kafka主题...仍然是在打印空的DStream ...这是什么原因? – kit

相关问题