我已经从HBase的转化的RDD:火花RDD发现通过键
VAL hbaseRDD:RDD [(字符串,数组[字符串])]其中tuple._1是rowkey。而数组是HBase中的值。
4929101-ACTIVE, ["4929101","2015-05-20 10:02:44","dummy1","dummy2"]
4929102-ACTIVE, ["4929102","2015-05-20 10:02:44","dummy1","dummy2"]
4929103-ACTIVE, ["4929103","2015-05-20 10:02:44","dummy1","dummy2"]
我也有一个SchemaRDD(ID,DATE1,COL1,COL2,COL3)转化为
VAL refDataRDD:RDD [(字符串,阵列[字符串])]为此,我将遍历和检查它是否存在于hbaseRDD:,
4929103, ["2015-05-21 10:03:44","EV01","col2","col3"]
4929104, ["2015-05-21 10:03:44","EV02","col2","col3"]
问题是
如何检查是否有键(tuple._1)/( “4929103”)是否存在于hbaseRDD中并获取相应的值(tuple._2)? - 我不能使用rdd.filter内PairRDD的查找功能,它会抛出“scala.MatchError:空”,但它的作品外
val filteredRDD = rdd.filter(sqlRow => { val hbaseLookup = hbaseRDD.lookup(sqlRow(0).toString + "-ACTIVE") // if found, check if date1 of hbaseRDD < sqlRow(1) // else if not found, retain row true })
我不知道如果是这样的问题,不过,因为我也经历了NPE,当我查找线路切换到:
val sqlRowHbase = hbaseRDD.filter(row => {
注:这些行之前,我做了hbaseRDD.count。和hbaseRDD.lookup的rdd.filter
外面工作正常所以基本上,我试图通过hbaseRDD键“发现”,并获得该行/值。加入它们有点复杂,因为两个RDD中的某些值可能为空。这取决于很多情况下哪些数据会保留哪些行。
嗨,我不是要更新r dd.date1,我试图通过比较rdd和hbaseRDD的值来过滤rdd。我更新了帖子以澄清事情。 – sophie
感谢您的澄清。 我认为leftOuterJoin仍然是要走的路,它将比迭代rdd和在另一个中查找值的计算花费少得多。 在如上定义的leftOuterJoin之后,基本上你在refData中每行有一个结果行,其中包含来自RefData的数据以及来自Hbase的数据(如果找到)。基于此,我相信您应该能够根据日期编写您描述的过滤/标记逻辑。 – Svend