2016-09-02 100 views
0

我使用星火1.3.1,我已经写了一个小程序,对Cassandra的过滤数据,如卡桑德拉星火连接器和过滤数据

val sc = new SparkContext(conf) 
val rdd = sc.cassandraTable("foo", "bar") 
val date = DateTime.now().minusHours(1) 
val rdd2 = rdd.filter(r => r.getDate("date").after(date.toDate)) 
println(rdd2.count()) 
sc.stop() 

这个程序很长一段时间运行,打印邮件

16/09/01 21:10:31 INFO Executor: Running task 46.0 in stage 0.0 (TID 46) 
16/09/01 21:10:31 INFO TaskSetManager: Finished task 42.0 in stage 0.0 (TID 42) in 20790 ms on localhost (43/1350) 

如果我终止这个程序,我的代码更改为

val date = DateTime.now().minusHours(1) 
val rdd2 = rdd.filter(r => r.getDate("date").after(date.toDate)) 

它仍然运行了很LO NG时间的消息像

6/09/01 21:14:01 INFO Executor: Running task 8.0 in stage 0.0 (TID 8) 
16/09/01 21:14:01 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 19395 ms on localhost (5/1350) 

所以它看起来像程序将始终尝试加载整个卡桑德拉表中存储(或尝试完全扫描的话),然后才应用滤镜。这对我来说似乎极其低效。

如何以更好的方式编写此代码,以便spark不会尝试将整个cassandra表(或完全扫描它)加载到RDD中,然后应用过滤器?

回答

1

你的代码

val rdd = sc.cassandraTable("foo", "bar") 
val date = DateTime.now().minusDays(30) 
rdd.filter(r => r.getDate("date").after(date.toDate)).count // Count Filtered RDD 

第一块所以,要小心。 RDD是不可变的,因此当您应用过滤器时,您需要使用返回的RDD,而不是您应用该函数的RDD。


val rdd = sc.cassandraTable("foo", "bar") 
val date = DateTime.now().minusDays(30) 
rdd.filter(r => r.getDate("date").after(date.toDate)) // Filters RDD 
println(rdd.cassandraCount()) // Ignores filtered rdd and counts everything 

更多efficency从卡桑德拉阅读:

如果您的日期栏是一个聚集键可以使用.where功能谓词下推卡桑德拉。除此之外,你可以做的修剪数据服务器端的事情不多。

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md#filtering-rows---where

+0

如何检查日期列是否为集群密钥?有一些我可以发出的命令吗? –

+0

我试过你的建议,但将过滤器的结果分配给rdd2,然后对此进行计数。但它仍然在说'完成的任务4.0在阶段0.0(TID 4)在112031毫秒在本地主机(5/1350)' –

+1

集群密钥是一个概念组织在Cassandra磁盘上的信息。这是你的Cassandra Schema的核心部分。既然你没有发布你的代码,我无法回答为什么它会花费任何时间。你应该看到它通过许多任务。但几乎没有任何情况下,它会比没有下推的全表扫描慢。 – RussS