2017-02-05 125 views
0

使用Kafka Spark-Streaming。能够读取和处理从Producer发送的数据。我在这里有一个场景,让我们假设Producer正在生成消息,并且Consumer关闭了一段时间并打开了。 Conumser现在只读取实时数据。相反,它应该保留停止阅读的地方的数据。 这是我一直在使用的pom.xml。Kafka Spark-Streaming偏移问题

<properties> 
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
     <spark.version>2.0.1</spark.version> 
     <kafka.version>0.8.2.2</kafka.version> 
    </properties> 


    <dependencies> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming_2.11</artifactId> 
      <version>${spark.version}</version> 
      <scope>provided</scope> 
     </dependency> 

     <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10 --> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming-kafka_2.11</artifactId> 
      <version>1.6.2</version> 
     </dependency> 

     <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 --> 
     <dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka_2.11</artifactId> 
      <version>${kafka.version}</version> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka-clients</artifactId> 
      <version>${kafka.version}</version> 
     </dependency> 

     <dependency> 
      <groupId>org.scala-lang</groupId> 
      <artifactId>scala-library</artifactId> 
      <version>2.11.1</version> 
     </dependency> 

     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-core_2.11</artifactId> 
      <version>${spark.version}</version> 
      <scope>provided</scope> 
     </dependency> 

     <!-- https://mvnrepository.com/artifact/org.json/json --> 
     <dependency> 
      <groupId>org.json</groupId> 
      <artifactId>json</artifactId> 
      <version>20160810</version> 
     </dependency> 

     <!-- https://mvnrepository.com/artifact/org.json4s/json4s-ast_2.11 --> 
     <dependency> 
      <groupId>org.json4s</groupId> 
      <artifactId>json4s-ast_2.11</artifactId> 
      <version>3.2.11</version> 
     </dependency> 

     <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> 
     <dependency> 
      <groupId>org.apache.hadoop</groupId> 
      <artifactId>hadoop-common</artifactId> 
      <version>2.2.0</version> 
     </dependency> 

我尝试过使用Kafka-v0.10.1.0 Producer和Conumser。行为与预期的一样(消费者从​​其离开的地方读取数据)。所以,在这个版本中偏移量被正确拾取。

已经尝试过在上面的pom.xml中使用相同的版本,但是失败的结果是java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker

我明白版本的兼容性,但我也在寻找连续流。

+0

你看看我的答案?面临的问题得到了解决? – oh54

回答

0

不同的行为可能是由于卡夫卡在版本0.8和0.10之间经历了一些相当大的变化。

除非你绝对要使用旧版本,我建议切换到较新的。

这个链接看看:

https://spark.apache.org/docs/latest/streaming-kafka-integration.html

卡夫卡项目推出0.8版本和0.10之间的新的消费API,所以有可用的2个独立的相应星火流包。

如果你想使用卡夫卡v0.10.1.0,你必须这样在 https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10_2.11指定一些卡夫卡火花流集成的依赖。

事情是这样的,例如:

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10_2.11 --> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> 
     <version>2.1.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.11</artifactId> 
     <version>2.1.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.11</artifactId> 
     <version>0.10.1.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.11</artifactId> 
     <version>2.1.0</version> 
    </dependency> 

附加说明:您正在使用Hadoop 2.2.0被发布10月,2013,因此在Hadoop中的术语古代,你应该考虑将其更改为一个较新的版本。

让我知道这是否有帮助。