1

由于从Cassandra中查询数据是有限制的,我试图用Spark批量读取数据并将其存储在RDD中。不能在Spark中联合使用两个CassandraJavaRDD <CassandraRow>

然后我添加所有的RDD,使用联合函数。

这是我的代码。

private void getDataFromCassandra(JavaSparkContext sc) { 


    CassandraJavaRDD<CassandraRow> cassandraRDD = null ; 
    CassandraJavaRDD<CassandraRow> cassandraRDD2 = null; 

    While(Some Condition) 

    cassandraRDD = CassandraJavaUtil 
       .javaFunctions(sc).cassandraTable("dmp", "table").select("abc", "xyz") 
       .where("pid IN ('" + sb + "')"); 

    if(cassandraRDD2==null){ 


    cassandraRDD2=cassandraRDD; 
    } 
    else{ 
     cassandraRDD2 = cassandraRDD2.union(cassandraRDD); 
    } 
}    

}

但在工会,我发现了以下错误。

类型不匹配:不能转换从JavaRDD到CassandraJavaRDD

虽然无论是RDD的是相似类型的。

所以1)须本人申请一个演员的

cassandraRDD2 = (CassandraJavaRDD<CassandraRow>) cassandraRDD2.union(cassandraRDD); 

2)或在RDD之一的类型更改为JavaRDD

+0

你在哪里设置'cassandraRDD2'?它似乎总是空的。 –

+0

在if条件中,我将cassandraRDD2分配给cassandraRDD。 –

+0

你如何执行'null.isEmpty()'?因为这就是你在那里做的 –

回答

2

因为根据docs的问题发生:

方法:联合(JavaRDD其他)返回此RDD的联合和另一个联合。

返回值:JavaRDD

,因此不匹配。

因为根据this

public class CassandraJavaRDD<R> extends JavaRDD<R> { 
... 
} 

CassandraJavaRDD类扩展JavaRDD所以可以使用:

JavaRDD<CassandraRow> cassandraRDD = null; 
JavaRDD<CassandraRow> cassandraRDD2 = null; 

因此union()方法的返回值将匹配其类型。

+0

感谢您的答复。 –

+0

JavaRDD cassandraRDD2 = sc.emptyRDD(); JavaRDD cassandraRDD = sc。emptyRDD();我可以将这两个空RDD联合为cassandraRDD2 = cassandraRDD2.union(cassandraRDD); ? –

+0

你应该可以做到。 –

相关问题