2015-07-28 49 views
1

我在为Spark Streaming部署运行字数统计示例时遇到问题。我正在尝试部署与Spark示例一起提供的相同文件,但我希望将此特定示例构建和部署为独立应用程序。在Spark上部署运行字数

我的文件是这样的:

package test; 

import scala.Tuple2; 
import com.google.common.collect.Lists; 

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.function.FlatMapFunction; 
import org.apache.spark.api.java.function.Function2; 
import org.apache.spark.api.java.function.PairFunction; 
import org.apache.spark.api.java.StorageLevels; 
import org.apache.spark.streaming.Durations; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaPairDStream; 
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 

import java.util.regex.Pattern; 

public final class JavaNetworkWordCount { 
    private static final Pattern SPACE = Pattern.compile(" "); 

    public static void main(String[] args) { 
     if (args.length < 2) { 
      System.err.println("Usage: JavaNetworkWordCount <hostname> <port>"); 
      System.exit(1); 
     } 


     // Create the context with a 1 second batch size 
     SparkConf sparkConf = new SparkConf() 
       .setAppName("JavaNetworkWordCount"); 
     JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, 
       Durations.seconds(1)); 

     // Create a JavaReceiverInputDStream on target ip:port and count the 
     // words in input stream of \n delimited text (eg. generated by 'nc') 
     // Note that no duplication in storage level only for running locally. 
     // Replication necessary in distributed scenario for fault tolerance. 
     JavaReceiverInputDStream<String> lines = ssc.socketTextStream(args[0], 
       Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER); 
     JavaDStream<String> words = lines 
       .flatMap(new FlatMapFunction<String, String>() { 
        public Iterable<String> call(String x) { 
         return Lists.newArrayList(SPACE.split(x)); 
        } 
       }); 
     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
       new PairFunction<String, String, Integer>() { 
        public Tuple2<String, Integer> call(String s) { 
         return new Tuple2<String, Integer>(s, 1); 
        } 
       }).reduceByKey(new Function2<Integer, Integer, Integer>() { 
      public Integer call(Integer i1, Integer i2) { 
       return i1 + i2; 
      } 
     }); 

     wordCounts.print(); 
     ssc.start(); 
     ssc.awaitTermination(); 
    } 
} 

我的POM是这样的:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
<modelVersion>4.0.0</modelVersion> 

<groupId>io.tester</groupId> 
<artifactId>streamer</artifactId> 
<version>0.0.1-SNAPSHOT</version> 
<packaging>jar</packaging> 

<name>streamer</name> 
<url>http://maven.apache.org</url> 

<properties> 
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
    <scala.binary.version>2.11</scala.binary.version> 
    <spark.version>1.4.1</spark.version> 
    <java.version>1.7</java.version> 
</properties> 

<dependencies> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_${scala.binary.version}</artifactId> 
     <version>${spark.version}</version> 
     <scope>provided</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_${scala.binary.version}</artifactId> 
     <version>${spark.version}</version> 
     <scope>provided</scope> 
    </dependency> 
</dependencies> 



<build> 
    <pluginManagement> 
     <plugins> 
      <plugin> 
       <artifactId>maven-assembly-plugin</artifactId> 
       <configuration> 
        <archive> 
         <manifest> 
          <mainClass>test.JavaNetworkWordCount</mainClass> 
         </manifest> 
        </archive> 
        <descriptorRefs> 
         <descriptorRef>jar-with-dependencies</descriptorRef> 
        </descriptorRefs> 
       </configuration> 
      </plugin> 
     </plugins> 
    </pluginManagement> 
</build> 

,我得到的错误是:

java.lang.NoClassDefFoundError: com/google/common/collect/Lists 

我通过我的jar看我用maven构建。它有一个附加的依赖关系,但它似乎并没有任何依赖关系。我通过mvn assembly运行它:single。我究竟做错了什么?

+0

请提供您如何提交作业...用火花提交? –

+0

./bin/spark-submit --class io.tester.JavaNetworkWordCount --master local [2] /tmp/streamer-0.0.1-SNAPSHOT.jar localhost 9999 – james

回答

0

由于maven-assembly-plugin表示

如果您的项目要打包神器在尤伯杯罐子,组装插件只提供基本的支持。如需更多控制,请使用Maven Shade插件

您可以尝试使用maven-shade-plugin。尝试更换maven-assembly-plugin插件标签:

<plugin> 
    <groupId>org.apache.maven.plugins</groupId> 
    <artifactId>maven-shade-plugin</artifactId> 
    <version>2.3</version> 
    <executions> 
    <!-- Run shade goal on package phase --> 
    <execution> 
    <phase>package</phase> 
    <goals> 
     <goal>shade</goal> 
    </goals> 
    <configuration> 
     <transformers> 
     <!-- add Main-Class to manifest file --> 
     <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> 
      <mainClass>test.JavaNetworkWordCount</mainClass> 
     </transformer> 
     </transformers> 
    </configuration> 
     </execution> 
    </executions> 
</plugin> 

这样应该可以创建一个包含所有你的依赖脂肪罐子。

+0

刚试过这个。没有工作。我也试过装配插件。也没有为我工作。 with-dependencies jar被构建,但是它没有依赖关系。只是我的课程。它与Scala有什么关系?另外,有没有一种方法可以提交给Spark而没有可执行的jar文件?在此先感谢您的帮助 – james

0

我明白了。我有两个问题。一,我没有注意到我有提供的条款(愚蠢的剪切和粘贴错误)。我遇到的第二个问题是,其中一个依赖关系被签名,我需要明确排除签名文件。我的最终产品,实际上万一有人是这样的长相是别人有这个问题:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
<modelVersion>4.0.0</modelVersion> 

<groupId>io.tester</groupId> 
<artifactId>streamer</artifactId> 
<version>0.0.1-SNAPSHOT</version> 
<packaging>jar</packaging> 

<name>streamer</name> 
<url>http://maven.apache.org</url> 

<properties> 
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
    <scala.binary.version>2.11</scala.binary.version> 
    <spark.version>1.4.1</spark.version> 
    <java.version>1.7</java.version> 
</properties> 

<dependencies> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_${scala.binary.version}</artifactId> 
     <version>${spark.version}</version> 

    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming_${scala.binary.version}</artifactId> 
     <version>${spark.version}</version> 

    </dependency> 
</dependencies> 



<build> 
    <plugins> 
     <plugin> 
      <groupId>org.apache.maven.plugins</groupId> 
      <artifactId>maven-shade-plugin</artifactId> 
      <version>2.3</version> 
      <executions> 
       <!-- Run shade goal on package phase --> 
       <execution> 
        <phase>package</phase> 
        <goals> 
         <goal>shade</goal> 
        </goals> 
        <configuration> 
         <filters> 
          <filter> 
           <artifact>*:*</artifact> 
           <excludes> 
            <exclude>META-INF/*.SF</exclude> 
            <exclude>META-INF/*.DSA</exclude> 
            <exclude>META-INF/*.RSA</exclude> 
           </excludes> 
          </filter> 
         </filters> 
         <transformers> 
          <!-- add Main-Class to manifest file --> 
          <transformer 
           implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> 
           <mainClass>io.tester.streamer.JavaNetworkWordCount</mainClass> 
          </transformer> 
         </transformers> 
        </configuration> 
       </execution> 
      </executions> 
     </plugin> 
    </plugins> 
</build>