我正在编写一个项目,接收来自Kafka的数据并写入Hbase表。因为我想知道记录的差异,所以我需要先用Hbase中的相同rowkey获取记录,然后对接收到的记录进行相减,最后将新记录保存到HBase表中。在Spark Streaming中读取Hbase数据
在开始时,我试着用newAPIHadoop
从hbase获取数据。这里是我的尝试:
val conf = HBaseConfiguration.create()
conf.set("zookeeper.znode.parent", "/hbase-secure")
conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
conf.set("hbase.zookeeper.quorum", zkQuorum)
conf.set("hbase.master", masterAddr)
conf.set("hbase.zookeeper.property.clientPort", portNum)
conf.set(TableInputFormat.INPUT_TABLE, tableName)
conf.set(TableInputFormat.SCAN_COLUMNS, cfName + ":" + colName)
val HbaseRDD = ssc.sparkContext.newAPIHadoopRDD(conf,
classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
通过这种方式,我能够与特定列家人和列名只有一次获得的记录的值。通过只说一次,我的意思是每次我开始我的spark-streaming应用程序,这段代码将被执行,我可以得到一个值,但它不会再执行。因为每次我收到卡夫卡的记录时,我想用cf和列从HBase中读取我的记录,这对我来说不起作用。
为了解决这个问题,我将逻辑移动到foreachRDD()
,但不幸的是sparkContext看起来不可序列化。我得到了像task is not serialzable
这样的错误。
最后,我发现有另一种方法可以使用hbase.clinet HTable从hbase读取数据。所以这是我最后的工作:
def transferToHBasePut(line: String): (ImmutableBytesWritable, Put) = {
val conf = HBaseConfiguration.create()
conf.set("zookeeper.znode.parent", "/hbase-secure")
conf.set("hbase.zookeeper.quorum", "xxxxxx")
conf.set("hbase.master", "xxxx")
conf.set("hbase.zookeeper.property.clientPort", "xxx")
conf.set(TableInputFormat.INPUT_TABLE, "xx")
conf.set(TableInputFormat.SCAN_COLUMNS, "xxxxx")
val testTable = new HTable(conf, "testTable")
val scan = new Scan
scan.addColumn("cf1".getBytes, "test".getBytes)
val rs = testTable.getScanner(scan)
var r = rs.next()
val res = new StringBuilder
while(r != null){
val tmp = new String(r.getValue("cf1".getBytes, "test".getBytes))
res.append(tmp)
r= rs.next()
}
val res = res.toString
//do the following manipulations and return object (ImmutableBytesWritable, Put)
..............................
.......................
}
在main方法我在foreachRDD上述方法使用和保存到HBase的使用方法saveAsNewAPIHadoopDataset
streamData.foreachRDD(stream => stream.map (transferToHBasePut).saveAsNewAPIHadoopDataset(job.getConfiguration))
这对我现在的工作很好,但我有疑问关于这个过程:
以这种方式,我猜想,对于RDD的每个分区,都会创建一个到HBase的连接。我想知道是否可以扩大我的应用程序。假如我在1秒内有超过1000条记录,看起来在我的火花流式传输中会建立1000个连接。
这是从hbase读取数据的正确方法吗?在sparkStreaming中从HBase读取数据的最佳实践是什么?或者火花流不应该读取任何数据,它只是设计为将数据流写入数据库。
在此先感谢。
谢谢你回答我的问题。我通过将conf作为参数传递给方法transferToHBasePut来尝试解决方案。但正如你所说的foreach在每个执行者jvm进程上执行,单身人士不能从司机转移到工人。我认为这是因为配置不可分割。最后我发现有一种叫做foreachPartition的方法可以用于RDD。该方法将保证每个RDD分区只建立一次连接。 – Frankie