2017-04-06 85 views
0

这很简单“如何”问题:: 我们可以通过com.databricks.spark.csv将数据带到Spark环境。我知道如何通过spark创建HBase表,并手动将数据写入HBase表。但是,甚至可以通过Spark将文本/ csv/jason文件直接加载到HBase?我看不到有人在谈论它。所以,只是检查。如果可能的话,请指导我一个很好的网站,详细解释scala代码以完成它。通过Spark加载csv文件到HBase

谢谢

回答

0

有多种方法可以做到这一点。

  1. 星火HBase的连接器:

https://github.com/hortonworks-spark/shc

你可以看到很多的例子的链接。

  1. 此外,您可以使用SPark核心使用HbaseConfiguration将数据加载到Hbase。

代码示例:

val fileRDD = sc.textFile(args(0), 2) 
    val transformedRDD = fileRDD.map { line => convertToKeyValuePairs(line) } 

    val conf = HBaseConfiguration.create() 
    conf.set(TableOutputFormat.OUTPUT_TABLE, "tableName") 
    conf.set("hbase.zookeeper.quorum", "localhost:2181") 
    conf.set("hbase.master", "localhost:60000") 
    conf.set("fs.default.name", "hdfs://localhost:8020") 
    conf.set("hbase.rootdir", "/hbase") 

    val jobConf = new Configuration(conf) 
    jobConf.set("mapreduce.job.output.key.class", classOf[Text].getName) 
    jobConf.set("mapreduce.job.output.value.class", classOf[LongWritable].getName) 
    jobConf.set("mapreduce.outputformat.class", classOf[TableOutputFormat[Text]].getName) 

    transformedRDD.saveAsNewAPIHadoopDataset(jobConf) 



def convertToKeyValuePairs(line: String): (ImmutableBytesWritable, Put) = { 

    val cfDataBytes = Bytes.toBytes("cf") 
    val rowkey = Bytes.toBytes(line.split("\\|")(1)) 
    val put = new Put(rowkey) 

    put.add(cfDataBytes, Bytes.toBytes("PaymentDate"), Bytes.toBytes(line.split("|")(0))) 
    put.add(cfDataBytes, Bytes.toBytes("PaymentNumber"), Bytes.toBytes(line.split("|")(1))) 
    put.add(cfDataBytes, Bytes.toBytes("VendorName"), Bytes.toBytes(line.split("|")(2))) 
    put.add(cfDataBytes, Bytes.toBytes("Category"), Bytes.toBytes(line.split("|")(3))) 
    put.add(cfDataBytes, Bytes.toBytes("Amount"), Bytes.toBytes(line.split("|")(4))) 
    return (new ImmutableBytesWritable(rowkey), put) 
    } 
  • 您也可以使用这一个
  • https://github.com/nerdammer/spark-hbase-connector