假设我们有一个RF = N的Cassandra集群和一个包含宽行的表。从Cassandra向Spark分布式加载宽行
我们的表可以有一个指数是这样的:pk/ck1/ck2/....
如果我们在表中创建一个行的RDD如下:
val wide_row = sc.cassandraTable(KS, TABLE).select("c1", "c2").where("pk = ?", PK)
我注意到一个星火节点具有100%数据和其他数据都没有。我认为这是因为spark-cassandra连接器无法将查询令牌范围分解为更小的子范围,因为它实际上不是范围 - 它只是PK的散列。
在这一点上,我们可以简单地调用redistribute(N)
在处理之前在Spark集群中传播数据,但这会影响通过网络将数据移动到已在Cassandra中本地存在数据的节点(记住RF = N)
我们真正想要的是让每个Spark节点从Cassandra本地加载行的子集(切片)。
想到的一种方法是在pk = PK时生成包含第一个群集密钥(ck1)的不同值列表的RDD。然后,我们可以使用mapPartitions()
根据ck1的每个值加载宽行的一部分。
假设我们已经有了我们的CK1列表值,我们可以写这样的事:
val ck1_list = .... // RDD
ck1_list.repartition(ck1_list.count().toInt) // create a partition for each value of ck1
val wide_row = ck1_list.mapPartitions(f)
在分区迭代器,F(),我们想调用另一个函数g(PK,CK1)从Cassandra中加载行片以获得分区键pk
和集群键ck1
。然后,我们可以将flatMap
应用于ck1_list
,以便在没有任何shuffing的情况下创建宽行的完全分布式RDD。
所以这里的问题:
是否有可能从星火任务中做出CQL电话吗?应该使用什么驱动程序?它可以只设置一次,以供后续任务重复使用吗?
任何帮助将不胜感激,谢谢。