我使用calliope即spark插件与cassandra连接。我创建了2个RDDS它看起来像与两个RDD一起工作apache spark
class A val persistLevel = org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK val cas1 = CasBuilder.cql3.withColumnFamily("cassandra_keyspace", "cassandra_coulmn_family 1") val sc1 = new SparkContext("local", "name it any thing ") var rdd1 = sc.cql3Cassandra[SCALACLASS_1](cas1) var rddResult1 = rdd1.persist(persistLevel)
class B val cas2 = CasBuilder.cql3.withColumnFamily("cassandra_keyspace", "cassandra_coulmn_family 2") var rdd2 = sc1.cql3Cassandra[SCALACLASS_2](cas2) var rddResult2 = rdd2.persist(persistLevel)
莫名其妙下面的代码库使用其他2不工作这将创建一个新的RDD。是否有可能我们不能一起迭代2个RDD?
这里是一个不正常的代码片段 -
case class Report(id: Long, anotherId: Long)
var reportRDD = rddResult2.flatMap(f => {
val buf = List[Report]()
**rddResult1.collect().toList**.foldLeft(buf)((k, v) => {
val buf1 = new ListBuffer[Report]
buf ++ v.INSTANCE_VAR_FROM_SCALACLASS_1.foldLeft(buf1)((ik, iv) => {
buf1 += Report(f.INSTANCE_VAR_FROM_SCALACLASS_1, iv.INSTANCE_VAR_FROM_SCALACLASS_2)
})
})
})
而如果我更换了大胆的事情并初始化VAL它喜欢 -
val collection = rddResult1.collect().toList
var reportRDD = rddResult2.flatMap(f => {
val buf = List[Report]()
**collection**.foldLeft(buf)((k, v) => {
val buf1 = new ListBuffer[Report]
buf ++ v.INSTANCE_VAR_FROM_SCALACLASS_1.foldLeft(buf1)((ik, iv) => {
buf1 += Report(f.INSTANCE_VAR_FROM_SCALACLASS_1, iv.INSTANCE_VAR_FROM_SCALACLASS_2)
})
})
})
它的工作原理,没有任何交代?
你得到了什么错误? – maasg
这是与该问题相关的很多代码。清理它吗?你会发现在Cassandra中没有混合的相同行为,以及'foldLeft'中的复杂性。否则它会提出一个很好的问题! –