2014-09-23 44 views
4

我在此Github中使用了一个教程,使用java maven项目在cassandra上运行spark:https://github.com/datastax/spark-cassandra-connectorSpark Datastax Java API Select语句

我已经想通了如何使用直接CQL语句,因为我以前问一个关于此问题:Querying Data in Cassandra via Spark in a Java Maven Project

不过,现在我试图使用datastax的Java API在恐惧中,我的原代码在我原来的问题中不适用于Spark和Cassandra的Datastax版本。出于某种奇怪的原因,即使在文档中概述我可以使用该确切语句,它也不会让我使用.where。这里是我的代码:

import org.apache.commons.lang3.StringUtils; 
import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.JavaRDD; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.api.java.function.Function; 

import java.io.Serializable; 

import static com.datastax.spark.connector.CassandraJavaUtil.*; 


public class App implements Serializable 
{ 

    // firstly, we define a bean class 
    public static class Person implements Serializable { 
     private Integer id; 
     private String fname; 
     private String lname; 
     private String role; 

     // Remember to declare no-args constructor 
     public Person() { } 

     public Integer getId() { return id; } 
     public void setId(Integer id) { this.id = id; } 

     public String getfname() { return fname; } 
     public void setfname(String fname) { this.fname = fname; } 

     public String getlname() { return lname; } 
     public void setlname(String lname) { this.lname = lname; } 

     public String getrole() { return role; } 
     public void setrole(String role) { this.role = role; } 

     // other methods, constructors, etc. 
    } 

    private transient SparkConf conf; 
    private App(SparkConf conf) { 
     this.conf = conf; 
    } 


    private void run() { 
     JavaSparkContext sc = new JavaSparkContext(conf); 
     createSchema(sc); 


     sc.stop(); 
    } 

    private void createSchema(JavaSparkContext sc) { 

     JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class) 
       .where("role=?", "IT Engineer").map(new Function<Person, String>() { 
        @Override 
        public String call(Person person) throws Exception { 
         return person.toString(); 
        } 
       }); 
     System.out.println("Data as Person beans: \n" + StringUtils.join("\n", rdd.toArray())); 
     } 



    public static void main(String[] args) 
    { 
     if (args.length != 2) { 
      System.err.println("Syntax: com.datastax.spark.demo.JavaDemo <Spark Master URL> <Cassandra contact point>"); 
      System.exit(1); 
     } 

     SparkConf conf = new SparkConf(); 
     conf.setAppName("Java API demo"); 
     conf.setMaster(args[0]); 
     conf.set("spark.cassandra.connection.host", args[1]); 

     App app = new App(conf); 
     app.run(); 
    } 
} 

以下是错误:

14/09/23 13:46:53 ERROR executor.Executor: Exception in task ID 0 
java.io.IOException: Exception during preparation of SELECT "role", "id", "fname", "lname" FROM "tester"."empbyrole" WHERE token("role") > -5709068081826432029 AND token("role") <= -5491279024053142424 AND role=? ALLOW FILTERING: role cannot be restricted by more than one relation if it includes an Equal 
    at com.datastax.spark.connector.rdd.CassandraRDD.createStatement(CassandraRDD.scala:310) 
    at com.datastax.spark.connector.rdd.CassandraRDD.com$datastax$spark$connector$rdd$CassandraRDD$$fetchTokenRange(CassandraRDD.scala:317) 
    at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:338) 
    at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:338) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:10) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
    at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:608) 
    at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:608) 
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884) 
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) 
    at org.apache.spark.scheduler.Task.run(Task.scala:53) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:205) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: role cannot be restricted by more than one relation if it includes an Equal 
    at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:35) 
    at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:256) 
    at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:91) 
    at com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45) 
    at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:28) 
    at com.sun.proxy.$Proxy8.prepare(Unknown Source) 
    at com.datastax.spark.connector.rdd.CassandraRDD.createStatement(CassandraRDD.scala:293) 
    ... 27 more 
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: role cannot be restricted by more than one relation if it includes an Equal 
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:97) 
    at com.datastax.driver.core.SessionManager$1.apply(SessionManager.java:156) 
    at com.datastax.driver.core.SessionManager$1.apply(SessionManager.java:131) 
    at com.google.common.util.concurrent.Futures$1.apply(Futures.java:711) 
    at com.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:849) 
    ... 3 more 
14/09/23 13:46:53 WARN scheduler.TaskSetManager: Lost TID 0 (task 0.0:0) 
14/09/23 13:46:53 WARN scheduler.TaskSetManager: Loss was due to java.io.IOException 
java.io.IOException: Exception during preparation of SELECT "role", "id", "fname", "lname" FROM "tester"."empbyrole" WHERE token("role") > -5709068081826432029 AND token("role") <= -5491279024053142424 AND role=? ALLOW FILTERING: role cannot be restricted by more than one relation if it includes an Equal 
    at com.datastax.spark.connector.rdd.CassandraRDD.createStatement(CassandraRDD.scala:310) 
    at com.datastax.spark.connector.rdd.CassandraRDD.com$datastax$spark$connector$rdd$CassandraRDD$$fetchTokenRange(CassandraRDD.scala:317) 
    at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:338) 
    at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:338) 
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
    at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:10) 
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
    at scala.collection.Iterator$class.foreach(Iterator.scala:727) 
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) 
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) 
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) 
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) 
    at scala.collection.AbstractIterator.to(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) 
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) 
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) 
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) 
    at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:608) 
    at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:608) 
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884) 
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) 
    at org.apache.spark.scheduler.Task.run(Task.scala:53) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:205) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
14/09/23 13:46:53 ERROR scheduler.TaskSetManager: Task 0.0:0 failed 1 times; aborting job 
14/09/23 13:46:53 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
14/09/23 13:46:53 INFO scheduler.DAGScheduler: Failed to run toArray at App.java:65 
Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task 0.0:0 failed 1 times (most recent failure: Exception failure: java.io.IOException: Exception during preparation of SELECT "role", "id", "fname", "lname" FROM "tester"."empbyrole" WHERE token("role") > -5709068081826432029 AND token("role") <= -5491279024053142424 AND role=? ALLOW FILTERING: role cannot be restricted by more than one relation if it includes an Equal) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
14/09/23 13:46:53 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster 

我知道我的错误。特别是在本节:

JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class) 
       .where("role=?", "IT Engineer").map(new Function<Person, String>() { 
        @Override 
        public String call(Person person) throws Exception { 
         return person.toString(); 
        } 
       }); 

当我删除了.where(),它的工作原理。但它特别在github上说,你应该能够分别执行.where和.map函数。有没有人有任何类型的推理呢?或解决方案?谢谢。

编辑 我得到的错误走开时,我用这个语句来代替:

JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class) 
       .where("id=?", "1").map(new Function<Person, String>() { 
        @Override 
        public String call(Person person) throws Exception { 
         return person.toString(); 
        } 
       }); 

我不知道为什么这个选项的作品,但不是我变的其余部分。下面是我在CQL跑了声明,让你知道我的密钥空间的样子:

session.execute("DROP KEYSPACE IF EXISTS tester"); 
    session.execute("CREATE KEYSPACE tester WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3}"); 
    session.execute("CREATE TABLE tester.emp (id INT PRIMARY KEY, fname TEXT, lname TEXT, role TEXT)"); 
    session.execute("CREATE TABLE tester.empByRole (id INT, fname TEXT, lname TEXT, role TEXT, PRIMARY KEY (role,id))"); 
    session.execute("CREATE TABLE tester.dept (id INT PRIMARY KEY, dname TEXT)");  

    session.execute(
       "INSERT INTO tester.emp (id, fname, lname, role) " + 
       "VALUES (" + 
        "0001," + 
        "'Angel'," + 
        "'Pay'," + 
        "'IT Engineer'" + 
        ");"); 
    session.execute(
       "INSERT INTO tester.emp (id, fname, lname, role) " + 
       "VALUES (" + 
        "0002," + 
        "'John'," + 
        "'Doe'," + 
        "'IT Engineer'" + 
        ");"); 
    session.execute(
       "INSERT INTO tester.emp (id, fname, lname, role) " + 
       "VALUES (" + 
        "0003," + 
        "'Jane'," + 
        "'Doe'," + 
        "'IT Analyst'" + 
        ");"); 
    session.execute(
      "INSERT INTO tester.empByRole (id, fname, lname, role) " + 
      "VALUES (" + 
       "0001," + 
       "'Angel'," + 
       "'Pay'," + 
       "'IT Engineer'" + 
       ");"); 
    session.execute(
       "INSERT INTO tester.empByRole (id, fname, lname, role) " + 
       "VALUES (" + 
        "0002," + 
        "'John'," + 
        "'Doe'," + 
        "'IT Engineer'" + 
        ");"); 
    session.execute(
       "INSERT INTO tester.empByRole (id, fname, lname, role) " + 
       "VALUES (" + 
        "0003," + 
        "'Jane'," + 
        "'Doe'," + 
        "'IT Analyst'" + 
        ");"); 
     session.execute(
       "INSERT INTO tester.dept (id, dname) " + 
       "VALUES (" + 
        "1553," + 
        "'Commerce'" + 
        ");"); 

回答

3

where方法添加到ALLOW FILTERING下盖查询。这不是一个神奇的子弹,因为它仍然不支持任意字段作为查询谓词。一般来说,该字段必须是索引或聚类列。如果这对您的数据模型不适用,则可以简单地在RDD上使用filter方法。缺点是过滤器发生在Spark而不是Cassandra。

因此,id字段的工作原理是因为它支持CQL WHERE子句,而我假设角色只是一个常规字段。请注意,我并不是建议您将字段编入索引或将其更改为集群列,因为我不知道您的数据模型。

+0

这很有帮助,但是为什么我不能在empByRole表上使用spark这个查询,其中'role'是我的主键?'where(“role =?”,“IT Engineer”)? – angyxpoo 2014-09-24 15:48:15

2

Spark Cassandra Connector中存在限制,where方法在分区键上不起作用。在你的表empByRole中,角色是一个分区键,因此是错误。它应该在群集列或索引列(二级索引)上正确工作。

这在GitHub项目中被追踪为issue 37,工作一直在进行中。

在Java API文档页面上,显示的示例使用.where("name=?", "Anna")。我假设这个名字不是一个分区键,但是这个例子可能更清楚。

+0

感谢您的回复。希望这个问题很快就会解决。 – angyxpoo 2014-09-24 17:38:50

+1

@angyxpoo文档页面已更新,以便您可以在何处使用时更清楚一些:https://github.com/datastax/spark-cassandra-connector/blob/master/doc/7_java_api.md – BrianC 2014-09-29 04:15:40

+0

Thanks @ BrianC这绝对是一个新的补充。我要检查一下。 – angyxpoo 2014-09-30 16:20:31