0

我的cassandra CF具有日期和id作为分区键。 查询时我只知道日期,所以我遍历了id的范围。Cassandra Spark连接器

我的问题围绕着连接器如何执行下面的代码。

SparkDriver代码看起来像 -

SparkConf conf = new SparkConf().setAppName("DemoApp") 
.conf.setMaster("local[*]") 
.set("spark.cassandra.connection.host", "10.*.*.*") 
.set("spark.cassandra.connection.port", "*"); 

JavaSparkContext sc = new JavaSparkContext(conf); 
SparkContextJavaFunctions javaFunctions = CassandraJavaUtil.javaFunctions(sc); 

String date = "23012017"; 

for(String id : idlist) { 

JavaRDD<CassandraRow> cassandraRowsRDD = 

javaFunctions.cassandraTable("datakeyspace", "sample2") 
      .where("date = ?",date) 
      .where("id = ? ", id) 
      .select("data"); 

cassandraRowsRDDList.add(cassandraRowsRDD); 
} 

List<CassandraRow> collectAllRows = new ArrayList<CassandraRow>(); 
     for(JavaRDD<CassandraRow> rdd : cassandraRowsRDDList){ 
      //do transformations 

      collectAllRows.addAll(rdd.collect()); 
    } 

1)所有我想在IDLIST问我是否循环首先,说IDLIST有1000元,这可能是不断增加的,这将是有效的?每个选择查询如何分布在集群中?特别是如何维护Cassandra DB连接?

2)在我的驱动程序中循环结束后,我将所有行放入List中,然后对每行应用转换并过滤掉重复项。这是否也会通过集群上的火花传播,还是会发生在驾驶员侧。

请帮助。

回答

0

火花cassandra连接器提供了更好的方法。 您可以创建(日期,ID)的rdd,然后调用列date和id上的joinWithCassandraTable函数。连接器巧妙地做到了这一点,所有的数据将只由工作人员提取,而且没有洗牌,每个工作人员只会获取数据的日期和ID。

相关问题