2017-05-24 84 views
0

我有一个如何使用spark来操作/迭代/扫描cassandra的多个表的问题。我们的项目使用spark & spark-cassandra-connector连接到cassandra来扫描多个表,尝试在不同的表中匹配相关值,如果匹配,则执行额外的操作,如表插入。使用情况如下图所示:使用spark来扫描多个cassandra表使用spark-cassandra-connector

sc.cassandraTable(KEYSPACE, "table1").foreach(
    row => { 
    val company_url = row.getString("company_url") 

    sc.cassandraTable(keyspace, "table2").foreach(
     val url = row.getString("url") 
     val value = row.getString("value") 
     if (company_url == url) { 
      sc.saveToCassandra(KEYSPACE, "target", SomeColumns(url, value)) 
     } 
    ) 
}) 

的问题是

  1. 火花RDD是不可序列化,原因sc.cassandraTable返回一个RDD嵌套搜索将失败。我知道要解决的唯一方法是使用sc.broadcast(sometable.collect())。但是如果sometable很大,collect会消耗所有的内存。而且,如果在使用情况下,多个表使用广播,则会消耗内存。

  2. RDD.persist可以处理这种情况,而不是广播吗?在我的情况下,我使用sc.cassandraTable来读取RDD中的所有表,并将其保存回磁盘,然后检索数据以便处理。如果它有效,我怎么保证rdd的读取是由块完成的?

  3. 除了火花,还有其他工具(如hadoop等??)可以优雅地处理案件吗?

回答

0

它看起来像你实际上试图做一系列的内部联接。见

joinWithCassandraTable方法

这可以让你使用一个RDD的元素做一个卡桑德拉表直接查询。根据您从Cassandra读取的数据部分,这可能是您最好的选择。如果分数太大,尽管您最好单独阅读两个表,然后使用RDD.join方法排列行。

如果一切都失败了,你总是可以手动使用CassandraConnector对象来直接访问Java驱动程序,并使用分布式环境中的原始请求。

+0

我无法进行连接,因为对于我的大多数情况,我必须使用string.contains来比较相关列,而不是字符串等于运算符。 – user8053367

+0

这将需要一个笛卡儿连接,除非你有像Solr这样的二级索引。 – RussS

+0

谢谢。如果我做笛卡尔连接,结果会很大,可能会耗尽内存? 以及如何使用二级索引来做的东西? – user8053367