2015-05-24 95 views
3

我已经从HBase的转化的RDD:火花RDD发现通过键

VAL hbaseRDD:RDD [(字符串,数组[字符串])]其中tuple._1是rowkey。而数组是HBase中的值。

4929101-ACTIVE, ["4929101","2015-05-20 10:02:44","dummy1","dummy2"] 
4929102-ACTIVE, ["4929102","2015-05-20 10:02:44","dummy1","dummy2"] 
4929103-ACTIVE, ["4929103","2015-05-20 10:02:44","dummy1","dummy2"] 

我也有一个SchemaRDD(ID,DATE1,COL1,COL2,COL3)转化为

VAL refDataRDD:RDD [(字符串,阵列[字符串])]为此,我将遍历和检查它是否存在于hbaseRDD:,

4929103, ["2015-05-21 10:03:44","EV01","col2","col3"] 
4929104, ["2015-05-21 10:03:44","EV02","col2","col3"] 

问题是

  • 如何检查是否有键(tuple._1)/( “4929103”)是否存在于hbaseRDD中并获取相应的值(tuple._2)? - 我不能使用rdd.filter内PairRDD的查找功能,它会抛出“scala.MatchError:空”,但它的作品外

    val filteredRDD = rdd.filter(sqlRow => { 
        val hbaseLookup = hbaseRDD.lookup(sqlRow(0).toString + "-ACTIVE") 
        // if found, check if date1 of hbaseRDD < sqlRow(1) 
        // else if not found, retain row 
        true 
    }) 
    

    我不知道如果是这样的问题,不过,因为我也经历了NPE,当我查找线路切换到:

    val sqlRowHbase = hbaseRDD.filter(row => { 
    

    注:这些行之前,我做了hbaseRDD.count。和hbaseRDD.lookup的rdd.filter

外面工作正常所以基本上,我试图通过hbaseRDD键“发现”,并获得该行/值。加入它们有点复杂,因为两个RDD中的某些值可能为空。这取决于很多情况下哪些数据会保留哪些行。

回答

0

假设您需要查找的一组a_id包含在一个RDD中,我认为您可以使用leftOuterJoin而不是迭代并查找每个值。

我在上面看到了关于date1潜在可变位置的评论。虽然我没有在下面解决它,但我认为这应该在查找本身之前通过某种特定的每行映射来处理。

如果我正确得到伪代码,您的RDD为(id, date),并且希望通过在hbase中查找数据来更新它,并更新日期,如果在hbase中为该id找到一行,并且它的日期早于在refData中的一个。那是对的吗?

如果是这样,假设你有一些像这样的裁判数据:

val refData = sc.parallelize(Array(
("4929103","2015-05-21 10:03:44"), 
("4929104","2015-05-21 10:03:44") 
)) 

而且从HBase的一些行数据:

val hbaseRDD = sc.parallelize(Array(
    ("4929101-ACTIVE", Array("4929101","2015-05-20 10:02:44")), 
    ("4929102-ACTIVE", Array("4929102","2015-05-20 10:02:44")), 
    ("4929103-ACTIVE", Array("4929103","2015-05-20 10:02:44")) 
)) 

然后,你可以从refData做到每个ID的查找到HBase的与一个简单的leftOuterJoin,并找到每一行:更新日期如有必要:

refData 
    // looks up in Hbase all rows whose date1 a_id value matches the id in searchedIds 
    .leftOuterJoin(hbaseRDD.map{ case (rowkey, Array(a_id, date1)) => (a_id, date1)}) 

    // update the date in refData if date from hBase is earlier 
    .map { case (rowKey, (refDate, maybeRowDate)) => (rowKey, chooseDate (refDate, maybeRowDate)) } 
    .collect 


def chooseDate(refDate: String, rowDate: Option[String]) = rowDate match { 

    // if row not found in Hbase: keep ref date 
    case None => refDate 

    case Some(rDate) => 
    if (true) /* replace this by first parsing the date, then check if rowDate < refDate */ 
     rowDate 
    else 
     refDate 
} 
+0

嗨,我不是要更新r dd.date1,我试图通过比较rdd和hbaseRDD的值来过滤rdd。我更新了帖子以澄清事情。 – sophie

+0

感谢您的澄清。 我认为leftOuterJoin仍然是要走的路,它将比迭代rdd和在另一个中查找值的计算花费少得多。 在如上定义的leftOuterJoin之后,基本上你在refData中每行有一个结果行,其中包含来自RefData的数据以及来自Hbase的数据(如果找到)。基于此,我相信您应该能够根据日期编写您描述的过滤/标记逻辑。 – Svend