2016-01-27 82 views
1

我目前正在使用Python批量装载CSV数据到HBase的表,我目前有使用写相应的HFiles麻烦saveAsNewAPIHadoopFile星火流 - HBase的批量加载

我目前的代码如下所示:

def csv_to_key_value(row): 
    cols = row.split(",") 
    result = ((cols[0], [cols[0], "f1", "c1", cols[1]]), 
       (cols[0], [cols[0], "f2", "c2", cols[2]]), 
       (cols[0], [cols[0], "f3", "c3", cols[3]])) 
    return result 

def bulk_load(rdd): 
    conf = {#Ommitted to simplify} 

    keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter" 
    valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter" 

    load_rdd = rdd.flatMap(lambda line: line.split("\n"))\ 
        .flatMap(csv_to_key_value) 
    if not load_rdd.isEmpty(): 
     load_rdd.saveAsNewAPIHadoopFile("file:///tmp/hfiles" + startTime, 
             "org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2", 
             conf=conf, 
             keyConverter=keyConv, 
             valueConverter=valueConv) 
    else: 
     print("Nothing to process") 

当我运行这段代码,我得到以下错误:

java.io.IOException: Added a key not lexically larger than previous. Current cell = 10/f1:c1/1453891407213/Minimum/vlen=1/seqid=0, lastCell = /f1:c1/1453891407212/Minimum/vlen=1/seqid=0 at org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter.checkKey(AbstractHFileWriter.java:204)

由于错误表示关键是问题,我抓住了我的RDD中的元素,它们如下(为便于阅读而格式化)

[(u'1', [u'1', 'f1', 'c1', u'A']), 
(u'1', [u'1', 'f2', 'c2', u'1A']), 
(u'1', [u'1', 'f3', 'c3', u'10']), 
(u'2', [u'2', 'f1', 'c1', u'B']), 
(u'2', [u'2', 'f2', 'c2', u'2B']), 
(u'2', [u'2', 'f3', 'c3', u'9']), 

。 。 。

(u'9', [u'9', 'f1', 'c1', u'I']), 
(u'9', [u'9', 'f2', 'c2', u'3C']), 
(u'9', [u'9', 'f3', 'c3', u'2']), 
(u'10', [u'10', 'f1', 'c1', u'J']), 
(u'10', [u'10', 'f2', 'c2', u'1A']), 
(u'10', [u'10', 'f3', 'c3', u'1'])] 

这是与我的CSV完美匹配的正确顺序。据我所知,在HBase中,一个关键字由{row,family,timestamp}定义。行和家族的组合对于我的数据中的所有条目都是独特和单调递增的,并且我没有时间戳的控制(这是我能想象的唯一问题)

任何人都可以告诉我如何避免/问题?

回答

0

嗯,这只是我的一个愚蠢的错误,我觉得有点愚蠢。字典顺序,顺序应该是1,10,2,3 ... 8,以保证正确的排序9.最简单的方法之前加载:

rdd.sortByKey(true); 

我希望我可以节省至少一人头痛我有。