2017-03-02 184 views
2

我试图从我的kafka制作者发送消息并在spark spark中进行流式处理。但是当我在火花提交中运行我的应用程序时,出现以下错误。Spark + Kafka streaming NoClassDefFoundError kafka/serializer/StringDecoder

错误

Exception in thread "main" java.lang.NoClassDefFoundError: kafka/serializer/StringDecoder 
     at com.spark_stream.Main.main(Main.java:37) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:498) 
     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736) 
     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185) 
     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210) 
     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124) 
     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.lang.ClassNotFoundException: kafka.serializer.StringDecoder 
     at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
     at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
     at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
     ... 10 more 

应用代码如下:

Main.java

package com.spark_stream; 

import java.util.Collections; 
import java.util.HashMap; 
import java.util.Map; 
import java.util.Set; 

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.api.java.JavaPairInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.kafka.KafkaUtils; 

import kafka.serializer.StringDecoder; 

public class Main { 

    public static void main(String[] args) { 
     // TODO Auto-generated method stub 

     System.out.println("spark started!"); 

      SparkConf conf = new SparkConf() 
        .setAppName("kafka-sandbox") 
        .setMaster("local[*]"); 
      JavaSparkContext sc = new JavaSparkContext(conf); 
      JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000)); 


      Map<String, String> kafkaParams = new HashMap<String, String>(); 
      kafkaParams.put("metadata.broker.list", "localhost:9092"); 
      Set<String> topics = Collections.singleton("speed"); 

      JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc, 
        String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics); 

      directKafkaStream.foreachRDD(rdd -> { 
       System.out.println("--- New RDD with " + rdd.partitions().size() 
         + " partitions and " + rdd.count() + " records"); 
       rdd.foreach(record -> System.out.println(record._2)); 
      }); 

      System.out.println("connection completed"); 


      ssc.start(); 

      ssc.awaitTermination(); 

      System.out.println("spark ended!"); 

    } 

} 

的pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 
    <groupId>com.spark_stream</groupId> 
    <artifactId>com.spark_stream</artifactId> 
    <version>0.0.1-SNAPSHOT</version> 


    <dependencies> 

    <dependency> <!-- Spark dependency --> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.10</artifactId> 
     <version>1.6.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_2.10</artifactId> 
     <version>1.6.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming-kafka_2.10</artifactId> 
     <version>1.6.0</version> 
    </dependency> 


</dependencies> 

    <properties> 
     <maven.compiler.source>1.8</maven.compiler.source> 
     <maven.compiler.target>1.8</maven.compiler.target> 
    </properties> 
</project> 

找不到此错误的解决方案。任何帮助,将不胜感激。

回答

2

看一看商务部:http://spark.apache.org/docs/latest/submitting-applications.html#launching-applications-with-spark-submit

更具体的部分:

路径捆绑的罐子,包括你的应用程序和所有的依赖。

鉴于你的pom.xml清楚地表明你正在构建的jar没有依赖关系。这就是为什么spark-submit无法找到类kafka.serializer.StringDecoder。

您可能需要使用来解决这样的问题是什么是一个插件,包括你的罐子里面你的依赖,在maven assembly插件可以帮助您完成此

+0

谢谢,添加maven assembly插件做的工作。 –

2

好像编译器无法找到卡夫卡瓶当你有不包含在pom文件中。 尝试在您的pom文件中添加以下依赖项。检查您正在使用的kafka版本。

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.10 --> 
<dependency> 
    <groupId>org.apache.kafka</groupId> 
    <artifactId>kafka_2.10</artifactId> 
    <version>0.8.0</version> 
</dependency> 
+0

添加此依赖关系后会出现相同的错误 –

相关问题