2014-09-30 12 views
1

假设我们有一个RF = N的Cassandra集群和一个包含宽行的表。从Cassandra向Spark分布式加载宽行

我们的表可以有一个指数是这样的:pk/ck1/ck2/....

如果我们在表中创建一个行的RDD如下:

val wide_row = sc.cassandraTable(KS, TABLE).select("c1", "c2").where("pk = ?", PK) 

我注意到一个星火节点具有100%数据和其他数据都没有。我认为这是因为spark-cassandra连接器无法将查询令牌范围分解为更小的子范围,因为它实际上不是范围 - 它只是PK的散列。

在这一点上,我们可以简单地调用redistribute(N)在处理之前在Spark集群中传播数据,但这会影响通过网络将数据移动到已在Cassandra中本地存在数据的节点(记住RF = N)

我们真正想要的是让每个Spark节点从Cassandra本地加载行的子集(切片)。

想到的一种方法是在pk = PK时生成包含第一个群集密钥(ck1)的不同值列表的RDD。然后,我们可以使用mapPartitions()根据ck1的每个值加载宽行的一部分。

假设我们已经有了我们的CK1列表值,我们可以写这样的事:

val ck1_list = .... // RDD 

ck1_list.repartition(ck1_list.count().toInt) // create a partition for each value of ck1 

val wide_row = ck1_list.mapPartitions(f) 

在分区迭代器,F(),我们想调用另一个函数g(PK,CK1)从Cassandra中加载行片以获得分区键pk和集群键ck1。然后,我们可以将flatMap应用于ck1_list,以便在没有任何shuffing的情况下创建宽行的完全分布式RDD。

所以这里的问题:

是否有可能从星火任务中做出CQL电话吗?应该使用什么驱动程序?它可以只设置一次,以供后续任务重复使用吗?

任何帮助将不胜感激,谢谢。

回答

4

为了将来的参考,我将解释我是如何解决这个问题的。

我实际上使用了一个略有不同的方法来描述上述的一个方法,它不涉及从Spark任务中调用Cassandra。

我从ck_list开始,这是pk = PK时第一个簇密钥的不同值列表。这里没有显示代码,但我实际上是使用CQL从Spark驱动程序直接从Cassandra下载了这个列表。

然后,我将ck_list转换为RDDS列表。接下来我们将RDD(每一个代表一个Cassandra行片)组合成一个统一的RDD(wide_row)。

演员阵容上CassandraRDD是必要的,因为union返回类型为org.apache.spark.rdd.RDD

运行我能够验证wide_row有X分区,其中x是ck_list大小的作业之后。一个有用的副作用是wide_row由第一个集群密钥分区,这也是我想要减少的密钥。因此避免了更多的混洗。

我不知道这是否是实现我想要的最好方法,但它确实有效。

val ck_list // list first cluster key values where pk = PK 

val wide_row = ck_list.map(ck => 
    sc.cassandraTable(KS, TBL) 
    .select("c1", "c2").where("pk = ? and ck1 = ?", PK, ck) 
    .asInstanceOf[org.apache.spark.rdd.RDD] 
).reduce((x, y) => x.union(y))