由于从Cassandra中查询数据是有限制的,我试图用Spark批量读取数据并将其存储在RDD中。不能在Spark中联合使用两个CassandraJavaRDD <CassandraRow>
然后我添加所有的RDD,使用联合函数。
这是我的代码。
private void getDataFromCassandra(JavaSparkContext sc) {
CassandraJavaRDD<CassandraRow> cassandraRDD = null ;
CassandraJavaRDD<CassandraRow> cassandraRDD2 = null;
While(Some Condition)
cassandraRDD = CassandraJavaUtil
.javaFunctions(sc).cassandraTable("dmp", "table").select("abc", "xyz")
.where("pid IN ('" + sb + "')");
if(cassandraRDD2==null){
cassandraRDD2=cassandraRDD;
}
else{
cassandraRDD2 = cassandraRDD2.union(cassandraRDD);
}
}
}
但在工会,我发现了以下错误。
类型不匹配:不能转换从JavaRDD到CassandraJavaRDD
虽然无论是RDD的是相似类型的。
所以1)须本人申请一个演员的
cassandraRDD2 = (CassandraJavaRDD<CassandraRow>) cassandraRDD2.union(cassandraRDD);
2)或在RDD之一的类型更改为JavaRDD
你在哪里设置'cassandraRDD2'?它似乎总是空的。 –
在if条件中,我将cassandraRDD2分配给cassandraRDD。 –
你如何执行'null.isEmpty()'?因为这就是你在那里做的 –