2014-06-12 115 views
1

我使用calliope即spark插件与cassandra连接。我创建了2个RDDS它看起来像与两个RDD一起工作apache spark

class A val persistLevel = org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK val cas1 = CasBuilder.cql3.withColumnFamily("cassandra_keyspace", "cassandra_coulmn_family 1") val sc1 = new SparkContext("local", "name it any thing ") var rdd1 = sc.cql3Cassandra[SCALACLASS_1](cas1) var rddResult1 = rdd1.persist(persistLevel)

class B val cas2 = CasBuilder.cql3.withColumnFamily("cassandra_keyspace", "cassandra_coulmn_family 2") var rdd2 = sc1.cql3Cassandra[SCALACLASS_2](cas2) var rddResult2 = rdd2.persist(persistLevel)

莫名其妙下面的代码库使用其他2不工作这将创建一个新的RDD。是否有可能我们不能一起迭代2个RDD?

这里是一个不正常的代码片段 -

case class Report(id: Long, anotherId: Long) 

    var reportRDD = rddResult2.flatMap(f => { 
    val buf = List[Report]() 
    **rddResult1.collect().toList**.foldLeft(buf)((k, v) => { 
     val buf1 = new ListBuffer[Report] 
     buf ++ v.INSTANCE_VAR_FROM_SCALACLASS_1.foldLeft(buf1)((ik, iv) => { 
     buf1 += Report(f.INSTANCE_VAR_FROM_SCALACLASS_1, iv.INSTANCE_VAR_FROM_SCALACLASS_2) 
     }) 
    }) 
    }) 

而如果我更换了大胆的事情并初始化VAL它喜欢 -

val collection = rddResult1.collect().toList 

var reportRDD = rddResult2.flatMap(f => { 
    val buf = List[Report]() 
    **collection**.foldLeft(buf)((k, v) => { 
     val buf1 = new ListBuffer[Report] 
     buf ++ v.INSTANCE_VAR_FROM_SCALACLASS_1.foldLeft(buf1)((ik, iv) => { 
     buf1 += Report(f.INSTANCE_VAR_FROM_SCALACLASS_1, iv.INSTANCE_VAR_FROM_SCALACLASS_2) 
     }) 
    }) 
    }) 

它的工作原理,没有任何交代?

+0

你得到了什么错误? – maasg

+3

这是与该问题相关的很多代码。清理它吗?你会发现在Cassandra中没有混合的相同行为,以及'foldLeft'中的复杂性。否则它会提出一个很好的问题! –

回答

5

您正在将变换与动作混合使用。关闭rdd2.flatMap对工作人员执行,而rdd1.collect是Spark行话中的“操作”,并将数据传回给驱动程序。所以,非正式地说,当你尝试flatmap时,数据不在那里。 (我不知道足够的内部信息 - 是 - 查明确切的根本原因)

如果要分布式操作两个RDD,则应该使用其中一个联接功能(联接, leftOuterJoin,rightOuterJoin,cogroup)。

E.g.

val mappedRdd1 = rdd1.map(x=> (x.id,x)) 
val mappedRdd2 = rdd2.map(x=> (x.customerId, x)) 

val joined = mappedRdd1.join(mappedRdd2) 
joined.flatMap(...reporting logic..).collect 
+0

感谢您的帮助,但不知何故,我没有获得RDD的连接功能。然而,uning'新的PairRDDFunctions(rdd1).join(rdd2)'工作。 – tesnik03

+1

你应该'导入​​org.apache.spark.SparkContext._'并将你的原始rdd映射到一个pairRDD。加入是在关键字上完成的,这是PairRDD中元组的第一个元素。使用上面给出的例子:'val mappedRdd1 = rdd1.map(x =>(x.id,x))'在您的数据模型中使用一个自然PK就可以实现。 – maasg

+0

真棒..谢谢 – tesnik03

2

您可以在应用程序中对RDD进行操作。但是你不能在执行者(工作者节点)上操作RDD。执行者不能发出命令来驱动集群。 flatMap中的代码在执行程序上运行。

在第一种情况下,您尝试对执行程序中的RDD进行操作。我估计你会得到一个NotSerializableException,因为你甚至不能将RDD对象发送给执行者。在第二种情况下,您将RDD内容拖到应用程序中,然后将此简单List发送给执行程序。 (Lambda捕获会自动序列化。)