0
我与火花流&卡桑德拉连接器的工作,和我有一些DAO等价类(使用Scala工作)。 在这个道,我想创建“CassandraTableScanRDD”。 问题是我没有sparkContext。我无法通过它, ,因为它不可序列化。如何创建“CassandraTableScanRDD”不sparkContext
这里是一个简单的应用程序,它说明问题:
object Engine {
def main(args: Array[String]) {
val sparkConf = new SparkConf()
val checkpointDir: String = getCheckpointDirectory()
val ssc = StreamingContext.getOrCreate(checkpointDir,() => {
val streamingContext:StreamingContext = new StreamingContext(sparkConf, batchDuration)
streamingContext.checkpoint(checkpointDir)
val dstreamsAvroEvents: InputDStream[ConsumerRecord[String, Array[Byte]]] = KafkaUtils.createDirectStream[String, Array[Byte]](
streamingContext,
PreferConsistent,
Subscribe[String, Array[Byte]](topics, kafkaParams)
)
Algo.processStream(dstreamsAvroEvents.map(x=>x.value()))
})
ssc.start()
ssc.awaitTermination()
}
}
object Algo {
val dao = new Dao(keyspace)
def processStream(avroEvents:DStream[Array[Byte]]):Unit = {
dao.findEntityBy(<somekey based on rdd>)
}
}
class Dao {
def getCassandraMappedTable():CassandraTableScanRDD[Entity] = {
//========================================
//HOW CAN I OBTAIN THE SPARK CONTEXT ???
//========================================
sparkContext.cassandraTable[Entity](keyspace, tableName)
}
def findEntityBy(someKey:String):FutureAction[Seq[Entity]] = {
getCassandraMappedTable()
.select(columnsNames: _*)
.where(eq(Columns.SOME_KEY), someKey)
.collectAsync()
}
}
任何想法如何解决这个问题呢?
感谢, 叶兰
如果没有SparkContext句点,则无法创建RDD。为什么你需要它序列化?你是否试图在另一个RDD的转换中创建一个RDD?如果是这样的 - 这是(故意)是不可能的,你的问题似乎是与设计 –
我试图执行卡桑德拉在下面的代码查询:“sparkContext.cassandraTable [SomeEntity(密钥空间,表名)。选择(columnsNames:_ * ) .where(eq(Columns.MY_KEY),somekey1) .collectAsync()' – EranM
这并不回答这个问题 - 你说“我不能将它传递给[SparkContext],因为它不可序列化” - 为什么它必须是可序列化的吗?代码在哪里执行?如果它在另一个RDD转换中 - 就像我写的那样 - 这是不可能的,你可能需要使用类似'join'的东西,但是我没有更多的上下文我不能说出来。 –