2016-09-29 84 views
3

我正在编写一个项目,接收来自Kafka的数据并写入Hbase表。因为我想知道记录的差异,所以我需要先用Hbase中的相​​同rowkey获取记录,然后对接收到的记录进行相减,最后将新记录保存到HBase表中。在Spark Streaming中读取Hbase数据

在开始时,我试着用newAPIHadoop从hbase获取数据。这里是我的尝试:

val conf = HBaseConfiguration.create() 
conf.set("zookeeper.znode.parent", "/hbase-secure") 
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName) 
conf.set("hbase.zookeeper.quorum", zkQuorum) 
conf.set("hbase.master", masterAddr) 
conf.set("hbase.zookeeper.property.clientPort", portNum) 
conf.set(TableInputFormat.INPUT_TABLE, tableName) 
conf.set(TableInputFormat.SCAN_COLUMNS, cfName + ":" + colName) 

val HbaseRDD = ssc.sparkContext.newAPIHadoopRDD(conf, 
     classOf[TableInputFormat], 
     classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], 
     classOf[org.apache.hadoop.hbase.client.Result]) 

通过这种方式,我能够与特定列家人和列名只有一次获得的记录的值。通过只说一次,我的意思是每次我开始我的spark-streaming应用程序,这段代码将被执行,我可以得到一个值,但它不会再执行。因为每次我收到卡夫卡的记录时,我想用cf和列从HBase中读取我的记录,这对我来说不起作用。

为了解决这个问题,我将逻辑移动到foreachRDD(),但不幸的是sparkContext看起来不可序列化。我得到了像task is not serialzable这样的错误。

最后,我发现有另一种方法可以使用hbase.clinet HTable从hbase读取数据。所以这是我最后的工作:

def transferToHBasePut(line: String): (ImmutableBytesWritable, Put) = { 
    val conf = HBaseConfiguration.create() 
    conf.set("zookeeper.znode.parent", "/hbase-secure") 
    conf.set("hbase.zookeeper.quorum", "xxxxxx") 
    conf.set("hbase.master", "xxxx") 
    conf.set("hbase.zookeeper.property.clientPort", "xxx") 
    conf.set(TableInputFormat.INPUT_TABLE, "xx") 
    conf.set(TableInputFormat.SCAN_COLUMNS, "xxxxx") 

    val testTable = new HTable(conf, "testTable") 
    val scan = new Scan 
    scan.addColumn("cf1".getBytes, "test".getBytes) 
    val rs = testTable.getScanner(scan) 

    var r = rs.next() 
    val res = new StringBuilder 
    while(r != null){ 
     val tmp = new String(r.getValue("cf1".getBytes, "test".getBytes)) 

     res.append(tmp) 
     r= rs.next() 
    } 
val res = res.toString 

//do the following manipulations and return object (ImmutableBytesWritable, Put) 
     .............................. 
     ....................... 
      } 

在main方法我在foreachRDD上述方法使用和保存到HBase的使用方法saveAsNewAPIHadoopDataset

streamData.foreachRDD(stream => stream.map (transferToHBasePut).saveAsNewAPIHadoopDataset(job.getConfiguration)) 

这对我现在的工作很好,但我有疑问关于这个过程:

以这种方式,我猜想,对于RDD的每个分区,都会创建一个到HBase的连接。我想知道是否可以扩大我的应用程序。假如我在1秒内有超过1000条记录,看起来在我的火花流式传输中会建立1000个连接。

这是从hbase读取数据的正确方法吗?在sparkStreaming中从HBase读取数据的最佳实践是什么?或者火花流不应该读取任何数据,它只是设计为将数据流写入数据库。

在此先感谢。

回答

0

foreachRDD在单个执行程序jvm进程上执行。至少你可以在transferToHBasePut方法中获得conf的单例实例(意味着在使用jvm进程或新conf的现有set conf之前进行空检查)。所以这会将Hbase连接的数量减少到Spark集群中产生的执行程序的数量。

希望这有助于...

+0

谢谢你回答我的问题。我通过将conf作为参数传递给方法transferToHBasePut来尝试解决方案。但正如你所说的foreach在每个执行者jvm进程上执行,单身人士不能从司机转移到工人。我认为这是因为配置不可分割。最后我发现有一种叫做foreachPartition的方法可以用于RDD。该方法将保证每个RDD分区只建立一次连接。 – Frankie

3

一些学习之后,我创建RDD的每个分区的配置。检查foreachRDD的设计模式Spark Streaming official website。实际上配置不是连接,所以我不知道如何从现有连接池获取连接并获取Hbase的记录。

+0

您是否已经通过Spark流向HBase进行了读取?我只能通过打开每个数据的连接来阅读它。有什么办法做到这一点? – zorkaya

相关问题