2

我通常会从Cassandra的数据加载到Apache的星火使用Java这样:如何将Cassandra ResultSet转换为Spark DataFrame?

SparkContext sparkContext = StorakleSparkConfig.getSparkContext(); 

CassandraSQLContext sqlContext = new CassandraSQLContext(sparkContext); 
    sqlContext.setKeyspace("midatabase"); 

DataFrame customers = sqlContext.cassandraSql("SELECT email, first_name, last_name FROM store_customer " + 
      "WHERE CAST(store_id as string) = '" + storeId + "'"); 

但是想象一下,我有一个sharder,我需要几个partion密钥加载到该数据帧。我可以在我的查询中使用WHERE IN(...)并再次使用cassandraSql方法。但是由于在协调器节点方面存在单点故障的臭名昭着的问题,我有点不情愿使用WHERE IN。这是在这里解释说:

https://lostechies.com/ryansvihla/2014/09/22/cassandra-query-patterns-not-using-the-in-query-for-multiple-partitions/

是否有使用几个查询,但它们加载到一个单一的数据帧的方法吗?

回答

1

这样做的一种方法是运行单个查询和unionAll/union多个DataFrames/RDDs。

SparkContext sparkContext = StorakleSparkConfig.getSparkContext(); 

CassandraSQLContext sqlContext = new CassandraSQLContext(sparkContext); 
    sqlContext.setKeyspace("midatabase"); 

DataFrame customersOne = sqlContext.cassandraSql("SELECT email, first_name, last_name FROM store_customer " + "WHERE CAST(store_id as string) = '" + storeId1 + "'"); 

DataFrame customersTwo = sqlContext.cassandraSql("SELECT email, first_name, last_name FROM store_customer " + "WHERE CAST(store_id as string) = '" + storeId2 + "'"); 

DataFrame allCustomers = customersOne.unionAll(CustomersTwo) 
+1

感谢您的回答!是的,我想到了这一点,但不确定Spark方面的性能影响。你认为有什么? –

+0

@MilenKovachev联盟非常高效,因为它不需要任何洗牌。但是,请注意,它可能会删除您的分区。看到这里:http://stackoverflow.com/questions/29977526/in-apache-spark-why-does-rdd-union-does-not-preserve-partitioner –

+0

假设我有一个可变数量的密钥,我需要检索,我将不得不在for循环中运行查询。有没有办法同时运行单个sqlContext.cassandraSql语句? –

相关问题