3

我有一个关闭kafka的流应用程序,我想知道是否有一种方法可以从地图功能内进行范围查询?从火星执行者查询卡桑德拉

我将来自kafka的消息按时间范围和密钥分组,然后基于这些时间范围和密钥我想从cassandra中将数据提取到该dstream中。

喜欢的东西:

lookups 
    .map(lookup => ((lookup.key, lookup.startTime, lookup.endTime), lookup)) 
    .groupByKey() 
    .transform(rdd => { 
    val cassandraSQLContext = new CassandraSQLContext(rdd.context) 
    rdd.map(lookupPair => { 
     val tableName = //variable based on lookup 
     val startTime = aggLookupPair._1._2 
     val endTime = aggLookupPair._1._3 

     cassandraSQLContext 
     .cassandraSql(s"SELECT * FROM ${CASSANDRA_KEYSPACE}.${tableName} WHERE key=${...} AND start_time >= ${startTime} AND start_time < ${endTime};") 
     .map(row => { 
      //match to { 
      case /*case 1*/ => new object1(row) 
      case /*case 2*/ =>new object2(row) 
      } 
     }) 
     .collect() 
    }) 
    }) 

这给了我一个空指针异常:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 59.0 failed 1 times, most recent failure: Lost task 0.0 in stage 59.0 (TID 63, localhost): java.lang.NullPointerException 
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:231) 
at org.apache.spark.sql.cassandra.CassandraSQLContext.cassandraSql(CassandraSQLContext.scala:70) 
at RollupFineGrainIngestionService$$anonfun$11$$anonfun$apply$2.apply(MyFile.scala:130) 
at RollupFineGrainIngestionService$$anonfun$11$$anonfun$apply$2.apply(MyFile.scala:123) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) 
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285) 
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) 
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 

我也试着ssc.cassandraTable(CASSANDRA_KEYSPACE, tableName).where("key = ?", ...)...但试图访问一个地图内的StreamingContext时引发崩溃。

如果有人有任何建议,我将不胜感激。谢谢!

回答

2

如果您的查询基于分区键,则可能需要使用joinWithCassandraTable

但是如果你需要更多的灵活性

CassandraConnector(sc.getConf).withSessionDo(session => ...) 

将允许您访问遗嘱执行人的会话池,并执行任何你想不管理连接。代码都是可序列化的,可以放在地图中。

+0

我不认为'SparkConf'是可序列化的,我遇到了序列化问题。此外,我试图避免joinWithCassandraTable,因为我不能做范围查询。 – nickn

+0

您可以使用带有JoinWithCassandraTable的范围查询来接受CassandraTableRDD接受的所有子句。 和CassandraConnector是可序列化的 val cc = CassandraConnector(sc.getConf) 然后在任何你喜欢的地方使用cc – RussS

+0

CassandraConnector工作,谢谢! – nickn