2014-06-27 32 views
4

我想将一个用Scala编写的例子(从Apache Spark项目)移植到Java中,并且遇到一些问题。如何使用Spark的.newAPIHadoopRDD()来自Java

从原来的Scala例子的代码

val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(), 
    classOf[CqlPagingInputFormat], 
    classOf[java.util.Map[String,ByteBuffer]], 
    classOf[java.util.Map[String,ByteBuffer]]) 

构建并运行得很好,但

JavaPairRDD rdd = jsc.newAPIHadoopRDD(job.getConfiguration(), 
    CqlPagingInputFormat.class, 
    java.util.Map<String, ByteBuffer>.class, 
    java.util.Map<String, ByteBuffer>.class); 

在爪哇(Cannot select from parameterized type)是不允许的。

更改

java.util.Map<String, ByteBuffer>.class 

Class.forName("java.util.Map<String, ByteBuffer>") 

产生一个新的错误:

Error:(42, 30) java: method newAPIHadoopRDD in class org.apache.spark.api.java.JavaSparkContext cannot be applied to given types; 
required: org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V> 
found: org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat>,java.lang.Class<capture#1 of ?>,java.lang.Class<capture#2 of ?> 
reason: inferred type does not conform to declared bound(s) 
inferred: org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat 
bound(s): org.apache.hadoop.mapreduce.InputFormat<capture#1 of ?,capture#2 of ?> 

它改变成简单java.util.Map.class产生类似的错误:

Error:(44, 30) java: method newAPIHadoopRDD in class org.apache.spark.api.java.JavaSparkContext cannot be applied to given types; 
required: org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V> 
found: org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat>,java.lang.Class<java.util.Map>,java.lang.Class<java.util.Map> 
reason: inferred type does not conform to declared bound(s) 
inferred: org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat 
bound(s): org.apache.hadoop.mapreduce.InputFormat<java.util.Map,java.util.Map> 

那么正确的翻译是什么?值得注意的是,newAPIHadoopRDD()函数是Scala和Java的不同实现。有关这些方法的文档可以在以下位置找到:here,这里是:http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaSparkContext.html#newAPIHadoopRDD(org.apache.hadoop.conf.Configuration,java.lang.Class,java.lang.Class,java.lang.Class)。

CqlPagingInputFormat声明看起来是这样的

public class CqlPagingInputFormat extends org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat<java.util.Map<java.lang.String,java.nio.ByteBuffer>,java.util.Map<java.lang.String,java.nio.ByteBuffer>> { 
+0

您是否尝试过'java.util.Map.class'而不是'java.util.Map .class'? –

+0

是的,我应该补充一点。我将在原始问题中发布错误,谢谢。 – martingms

回答

2

最后我得到它经过多次战斗解决。 问题是newHadoopAPI需要一个扩展org.apache.hadoop.mapreduce.InputFormat和org.apache.cassandra.hadoop.cql3.CqlInputFormat的类不直接扩展InputFormat,而是扩展org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat,它在转向扩展InputFormat。

Eclipse使用了足够聪明的groovy编译器来解决这个问题,但是Java的默认编译器无法解决这个问题。另外,Groovy编译器正确解析了K编译器发现的K,V值不兼容的问题。

您需要添加以下更改pom.xml文件使用Groovy编译器:

<properties> 
    <groovy-version>1.8.6</groovy-version> 
    <maven-comipler-plugin-version>2.5.1</maven-comipler-plugin-version> 
    <groovy-eclipse-compiler-version>2.7.0-01</groovy-eclipse-compiler-version> 
    <maven-clover2-plugin-version>3.1.7</maven-clover2-plugin-version> 
    <groovy-eclipse-batch-version>1.8.6-01</groovy-eclipse-batch-version> 
</properties> 
  1. 添加时髦的依赖

    <dependencies> 
        <dependency> 
         <groupId>org.codehaus.groovy</groupId> 
         <artifactId>groovy-all</artifactId> 
         <version>${groovy-version}</version> 
        </dependency> 
    <dependencies> 
    
  2. 添加grovvy插件下构建中使用它作为我们的代码的编译器

    <build> 
        <pluginManagement> 
         <plugins> 
         <plugin> 
          <groupId>org.apache.maven.plugins</groupId> 
          <artifactId>maven-compiler-plugin</artifactId> 
          <version>${maven-comipler-plugin-version}</version> 
          <configuration> 
           <!-- Bind Groovy Eclipse Compiler --> 
           <compilerId>groovy-eclipse-compiler</compilerId> 
           <source>${jdk-version}</source> 
           <target>${jdk-version}</target> 
          </configuration> 
          <dependencies> 
           <!-- Define which Groovy version will be used for build (default is 
            2.0) --> 
           <dependency> 
            <groupId>org.codehaus.groovy</groupId> 
            <artifactId>groovy-eclipse-batch</artifactId> 
            <version>${groovy-eclipse-batch-version}</version> 
           </dependency> 
           <!-- Define dependency to Groovy Eclipse Compiler (as it's referred 
            in compilerId) --> 
           <dependency> 
            <groupId>org.codehaus.groovy</groupId> 
            <artifactId>groovy-eclipse-compiler</artifactId> 
            <version>${groovy-eclipse-compiler-version}</version> 
           </dependency> 
          </dependencies> 
         </plugin> 
         <!-- Define Groovy Eclipse Compiler again and set extensions=true. Thanks 
          to this, plugin will --> 
         <!-- enhance default build life cycle with an extra phase which adds 
          additional Groovy source folders --> 
         <!-- It works fine under Maven 3.x, but we've encountered problems with 
          Maven 2.x --> 
         <plugin> 
          <groupId>org.codehaus.groovy</groupId> 
          <artifactId>groovy-eclipse-compiler</artifactId> 
          <version>${groovy-eclipse-compiler-version}</version> 
          <extensions>true</extensions> 
         </plugin> 
         <!-- Configure Clover for Maven plug-in. Please note that it's not bound 
          to any execution phase, --> 
         <!-- so you'll have to call Clover goals from command line. --> 
         <plugin> 
          <groupId>com.atlassian.maven.plugins</groupId> 
          <artifactId>maven-clover2-plugin</artifactId> 
          <version>${maven-clover2-plugin-version}</version> 
          <configuration> 
           <generateHtml>true</generateHtml> 
           <historyDir>.cloverhistory</historyDir> 
          </configuration> 
         </plugin> 
         </plugins> 
        </pluginManagement> 
    </build> 
    

这应该解决它。

+1

不错!我最终使用了Datastax自己的[cassandra-driver-spark](https:// github。com/datastax/cassandra-driver-spark)(来自Scala,并将其封装在Java中),它具有更好的API(Java支持即将推出)。很高兴看到你明白了。 – martingms

+0

嗨,当我使用JavaSparkContext.newAPIHadoopRDD(conf,XXInputFormat.class,NullWritable.class,Map.class)时,我遇到了一个问题,我使用XXInputFormat直接扩展mapreduce.InputFormat并使用Map 作为InputFormat参数。 – tobe

+0

错误:\t sc.newAPIHadoopRDD(conf,TestInputFormat.class,NullWritable.class,Map.class); \t ^^^^^^^^^^^^^^^ 束缚不匹配:类型JavaSparkContext的一般方法newAPIHadoopRDD(配置,类,类,类)是不适用的参数(配置,类,Class ,类)。推断类型TestInputFormat不是有界参数的有效替代方法> – tobe

相关问题