0

我与火花流&卡桑德拉连接器的工作,和我有一些DAO等价类(使用Scala工作)。 在这个道,我想创建“CassandraTableScanRDD”。 问题是我没有sparkContext。我无法通过它, ,因为它不可序列化。如何创建“CassandraTableScanRDD”不sparkContext

这里是一个简单的应用程序,它说明问题:

object Engine { 
    def main(args: Array[String]) { 
     val sparkConf = new SparkConf() 

     val checkpointDir: String = getCheckpointDirectory() 
     val ssc = StreamingContext.getOrCreate(checkpointDir,() => { 

      val streamingContext:StreamingContext = new StreamingContext(sparkConf, batchDuration) 
      streamingContext.checkpoint(checkpointDir) 
      val dstreamsAvroEvents: InputDStream[ConsumerRecord[String, Array[Byte]]] = KafkaUtils.createDirectStream[String, Array[Byte]](
       streamingContext, 
       PreferConsistent, 
       Subscribe[String, Array[Byte]](topics, kafkaParams) 
      ) 

      Algo.processStream(dstreamsAvroEvents.map(x=>x.value())) 
     })  

     ssc.start() 
     ssc.awaitTermination() 

    } 


} 

object Algo { 
    val dao = new Dao(keyspace) 

    def processStream(avroEvents:DStream[Array[Byte]]):Unit = { 
     dao.findEntityBy(<somekey based on rdd>) 
    } 

} 

class Dao { 

    def getCassandraMappedTable():CassandraTableScanRDD[Entity] = { 
    //======================================== 
    //HOW CAN I OBTAIN THE SPARK CONTEXT ??? 
    //======================================== 
    sparkContext.cassandraTable[Entity](keyspace, tableName) 
    } 

    def findEntityBy(someKey:String):FutureAction[Seq[Entity]] = { 
    getCassandraMappedTable() 
     .select(columnsNames: _*) 
     .where(eq(Columns.SOME_KEY), someKey) 
     .collectAsync() 
    } 

} 

任何想法如何解决这个问题呢?

感谢, 叶兰

+2

如果没有SparkContext句点,则无法创建RDD。为什么你需要它序列化?你是否试图在另一个RDD的转换中创建一个RDD?如果是这样的 - 这是(故意)是不可能的,你的问题似乎是与设计 –

+0

我试图执行卡桑德拉在下面的代码查询:“sparkContext.cassandraTable [SomeEntity(密钥空间,表名)。选择(columnsNames:_ * ) .where(eq(Columns.MY_KEY),somekey1) .collectAsync()' – EranM

+1

这并不回答这个问题 - 你说“我不能将它传递给[SparkContext],因为它不可序列化” - 为什么它必须是可序列化的吗?代码在哪里执行?如果它在另一个RDD转换中 - 就像我写的那样 - 这是不可能的,你可能需要使用类似'join'的东西,但是我没有更多的上下文我不能说出来。 –

回答

0

您可以通过在使用SparkContext@transient,把我的头顶部,应该工作。

class Dao(@transient context: SparkContext) { 
    .. 
} 
+0

我使用检查点。所以当RDD在火花流中恢复时,它缺少SparkContext。所以,我没有地方从SparkContext拉。 – EranM