0

我有一个像下面这样的cassandra表,并且想要使用一些条件从cassandra获取记录并将其放置在配置单元表中。Cassandra到Hive使用Spark

卡桑德拉表(employee)入口:

Id Name Amount Time 
1 abc 1000 2017041801 
2 def 1000 2017041802 
3 ghi 1000 2017041803 
4 jkl 1000 2017041804 
5 mno 1000 2017041805 
6 pqr 1000 2017041806 
7 stu 1000 2017041807 

假设该表列是数据类型的字符串。 我们在蜂巢中也有相同的模式。

现在我想导入cassandra记录在2017041801到2017041804之间配置为hive或hdfs。在第二次运行中,我将根据prev运行来提取增量记录。

我可以使用下面的语法将cassandra数据加载到RDD中。现在

val sc = new SparkContext(conf) 
val rdd = sc.cassandraTable("mydb", "Employee") 

我的问题是我怎么能根据条件之间并坚持在蜂箱或蜂房外部表路径筛选的记录筛选该记录。

不幸的是我的时间列不是cassandra表中的集群键。所以我无法使用.where()子句。

我是新来的这个scala和火花。所以,请善意帮助这个过滤器逻辑或任何其他更好的方式来实现这个逻辑使用数据帧,请让我知道。

在此先感谢。

+2

过滤,你可以在火花本身做,上线的东西:保存http://stackoverflow.com/a/39283574/7413631蜂巢这里覆盖http://stackoverflow.com/questions/37050828/save-spark-rdd-to-hive-table –

回答

0
  1. 我推荐使用Connector Dataframe API从C * https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md加载。
  2. 使用df.filter()调用谓词
  3. saveAsTable()方法将数据存储在配置单元中。

这里是引发2.0例如,对于你的情况

val df = spark 
    .read 
    .format("org.apache.spark.sql.cassandra") 
    .options(Map("table" -> "Employee", "keyspace" -> "mydb")) 
    .load() 
df.filter("time between 2017041801 and 2017041804") 
    .write.mode("overwrite").saveAsTable("hivedb.employee"); 
+0

谢谢@Artem Aliev – vkumarg3