0
我使用星火1.3.1,我已经写了一个小程序,对Cassandra的过滤数据,如卡桑德拉星火连接器和过滤数据
val sc = new SparkContext(conf)
val rdd = sc.cassandraTable("foo", "bar")
val date = DateTime.now().minusHours(1)
val rdd2 = rdd.filter(r => r.getDate("date").after(date.toDate))
println(rdd2.count())
sc.stop()
这个程序很长一段时间运行,打印邮件
16/09/01 21:10:31 INFO Executor: Running task 46.0 in stage 0.0 (TID 46)
16/09/01 21:10:31 INFO TaskSetManager: Finished task 42.0 in stage 0.0 (TID 42) in 20790 ms on localhost (43/1350)
如果我终止这个程序,我的代码更改为
val date = DateTime.now().minusHours(1)
val rdd2 = rdd.filter(r => r.getDate("date").after(date.toDate))
它仍然运行了很LO NG时间的消息像
6/09/01 21:14:01 INFO Executor: Running task 8.0 in stage 0.0 (TID 8)
16/09/01 21:14:01 INFO TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 19395 ms on localhost (5/1350)
所以它看起来像程序将始终尝试加载整个卡桑德拉表中存储(或尝试完全扫描的话),然后才应用滤镜。这对我来说似乎极其低效。
如何以更好的方式编写此代码,以便spark不会尝试将整个cassandra表(或完全扫描它)加载到RDD中,然后应用过滤器?
如何检查日期列是否为集群密钥?有一些我可以发出的命令吗? –
我试过你的建议,但将过滤器的结果分配给rdd2,然后对此进行计数。但它仍然在说'完成的任务4.0在阶段0.0(TID 4)在112031毫秒在本地主机(5/1350)' –
集群密钥是一个概念组织在Cassandra磁盘上的信息。这是你的Cassandra Schema的核心部分。既然你没有发布你的代码,我无法回答为什么它会花费任何时间。你应该看到它通过许多任务。但几乎没有任何情况下,它会比没有下推的全表扫描慢。 – RussS