2014-09-22 27 views
3

,所以我之前就如何在Java Maven项目在这里使用的火花来查询卡桑德拉一些问题:Querying Data in Cassandra via Spark in a Java Maven ProjectRDD不可序列卡桑德拉/星火连接器的Java API

那么我的问题得到回答,它的工作,但是我已经遇到问题(可能是一个问题)。我正在尝试使用datastax java API。这里是我的代码:

package com.angel.testspark.test2; 

import org.apache.commons.lang3.StringUtils; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 

import java.io.Serializable; 

import static com.datastax.spark.connector.CassandraJavaUtil.*; 


public class App 
{ 

    // firstly, we define a bean class 
    public static class Person implements Serializable { 
     private Integer id; 
     private String fname; 
     private String lname; 
     private String role; 

     // Remember to declare no-args constructor 
     public Person() { } 

     public Integer getId() { return id; } 
     public void setId(Integer id) { this.id = id; } 

     public String getfname() { return fname; } 
     public void setfname(String fname) { this.fname = fname; } 

     public String getlname() { return lname; } 
     public void setlname(String lname) { this.lname = lname; } 

     public String getrole() { return role; } 
     public void setrole(String role) { this.role = role; } 

     // other methods, constructors, etc. 
    } 

    private transient SparkConf conf; 
    private App(SparkConf conf) { 
     this.conf = conf; 
    } 


    private void run() { 
     JavaSparkContext sc = new JavaSparkContext(conf); 
     createSchema(sc); 


     sc.stop(); 
    } 

    private void createSchema(JavaSparkContext sc) { 

     JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class) 
       .where("role=?", "IT Engineer").map(new Function<Person, String>() { 
        @Override 
        public String call(Person person) throws Exception { 
         return person.toString(); 
        } 
       }); 
     System.out.println("Data as Person beans: \n" + StringUtils.join("\n", rdd.toArray())); 
       } 



    public static void main(String[] args) 
    { 
     if (args.length != 2) { 
      System.err.println("Syntax: com.datastax.spark.demo.JavaDemo <Spark Master URL> <Cassandra contact point>"); 
      System.exit(1); 
     } 

     SparkConf conf = new SparkConf(); 
     conf.setAppName("Java API demo"); 
     conf.setMaster(args[0]); 
     conf.set("spark.cassandra.connection.host", args[1]); 

     App app = new App(conf); 
     app.run(); 
    } 
} 

这里是我的错误:

Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: com.angel.testspark.test2.App 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:781) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:724) 
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:554) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

现在我知道正是我的错误是。它是System.out.println("Data as Person beans: \n" + StringUtils.join("\n", rdd.toArray()));,因为我需要将rdd转换为数组。但是,API文档SAID我应该能够做到这一点......这是从文档复制和粘贴的代码。为什么我不能将RDD序列化为数组?

我已经在我的帖子中插入了虚拟数据到我的cassandra中,这些信息包含在上面的链接中。

另外,我解决了以前的一个错误,当我将所有的getter和setter改为小写。当我在他们中使用大写字母时,它产生了一个错误。为什么我不能在我的获得者和制定者中使用资本?

感谢, 天使

回答

5

更改public class Apppublic class App implements Serializable应该修正这个错误。由于java内部类将保留对外部类的引用,因此您的Function对象将引用App。由于Spark需要序列化您的Function对象,因此需要App也是可序列化的。

+0

谢谢!这工作。你有什么洞察力,为什么这个声明不起作用? 'JavaRDD rdd = javaFunctions(sc).cassandraTable(“tester”,“empbyrole”,Person.class).where(“role =?”,“IT Engineer”)。map(new Function '如果我离开.where(),它会给我一个错误,但是如果我移除它并离开.map,则整个代码都可以正常工作。记录下.where应该可以工作 – angyxpoo 2014-09-23 15:29:42

+0

什么是错误信息? – zsxwing 2014-09-24 06:27:32

+0

我发布了另一个问题对此,但它已被回答http://stackoverflow.com/questions/26001566/spark-datastax-java-api-select-statements/26020947#26020947。谢谢! – angyxpoo 2014-09-24 15:39:03

相关问题