我有一个如何使用spark来操作/迭代/扫描cassandra的多个表的问题。我们的项目使用spark & spark-cassandra-connector连接到cassandra来扫描多个表,尝试在不同的表中匹配相关值,如果匹配,则执行额外的操作,如表插入。使用情况如下图所示:使用spark来扫描多个cassandra表使用spark-cassandra-connector
sc.cassandraTable(KEYSPACE, "table1").foreach(
row => {
val company_url = row.getString("company_url")
sc.cassandraTable(keyspace, "table2").foreach(
val url = row.getString("url")
val value = row.getString("value")
if (company_url == url) {
sc.saveToCassandra(KEYSPACE, "target", SomeColumns(url, value))
}
)
})
的问题是
火花RDD是不可序列化,原因sc.cassandraTable返回一个RDD嵌套搜索将失败。我知道要解决的唯一方法是使用sc.broadcast(sometable.collect())。但是如果sometable很大,collect会消耗所有的内存。而且,如果在使用情况下,多个表使用广播,则会消耗内存。
RDD.persist可以处理这种情况,而不是广播吗?在我的情况下,我使用sc.cassandraTable来读取RDD中的所有表,并将其保存回磁盘,然后检索数据以便处理。如果它有效,我怎么保证rdd的读取是由块完成的?
除了火花,还有其他工具(如hadoop等??)可以优雅地处理案件吗?
我无法进行连接,因为对于我的大多数情况,我必须使用string.contains来比较相关列,而不是字符串等于运算符。 – user8053367
这将需要一个笛卡儿连接,除非你有像Solr这样的二级索引。 – RussS
谢谢。如果我做笛卡尔连接,结果会很大,可能会耗尽内存? 以及如何使用二级索引来做的东西? – user8053367