2016-10-22 50 views
2

我正在尝试使用scala(以前没有使用过)将SparkRDD写入HBase表格。整个代码是这样的:使用Scala将SparkRDD写入HBase表格

import org.apache.hadoop.hbase.client.{HBaseAdmin, Result} 
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} 
import org.apache.hadoop.hbase.mapreduce.TableInputFormat 
import org.apache.hadoop.hbase.io.ImmutableBytesWritable  
import scala.collection.JavaConverters._ 
import org.apache.hadoop.hbase.util.Bytes 
import org.apache.spark._ 
import org.apache.hadoop.mapred.JobConf 
import org.apache.spark.rdd.PairRDDFunctions 
import org.apache.spark.SparkContext._ 
import org.apache.hadoop.mapred.Partitioner; 
import org.apache.hadoop.hbase.mapred.TableOutputFormat 
import org.apache.hadoop.hbase.client._ 

object HBaseWrite { 
    def main(args: Array[String]) { 
    val sparkConf = new SparkConf().setAppName("HBaseWrite").setMaster("local").set("spark.driver.allowMultipleContexts","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
    val sc = new SparkContext(sparkConf) 
    val conf = HBaseConfiguration.create() 
    val outputTable = "tablename" 

    System.setProperty("user.name", "hdfs") 
    System.setProperty("HADOOP_USER_NAME", "hdfs") 
    conf.set("hbase.master", "localhost:60000") 
    conf.setInt("timeout", 120000) 
    conf.set("hbase.zookeeper.quorum", "localhost") 
    conf.set("zookeeper.znode.parent", "/hbase-unsecure") 
    conf.setInt("hbase.client.scanner.caching", 10000) 
    sparkConf.registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.client.Result])) 
    val jobConfig: JobConf = new JobConf(conf,this.getClass) 
    jobConfig.setOutputFormat(classOf[TableOutputFormat]) 
    jobConfig.set(TableOutputFormat.OUTPUT_TABLE,outputTable) 
    val x = 12 
    val y = 15 
    val z = 25 
    var newarray = Array(x,y,z) 
    val newrddtohbase = sc.parallelize(newarray) 
    def convert(a:Int) : Tuple2[ImmutableBytesWritable,Put] = { 
      val p = new Put(Bytes.toBytes(a)) 
      p.add(Bytes.toBytes("columnfamily"), 
      Bytes.toBytes("col_1"), Bytes.toBytes(a)) 
      new Tuple2[ImmutableBytesWritable,Put](new ImmutableBytesWritable(a.toString.getBytes()), p); 
    } 
    new PairRDDFunctions(newrddtohbase.map(convert)).saveAsHadoopDataset(jobConfig) 
    sc.stop() 
    } 
} 

这样做后,我得到的错误HBaseWrite(主(阵列())是这样的:?

org.apache.spark.SparkException: Task not serializable 

我该如何继续完成它

+0

将'convert'方法作为函数literal传递给map方法,解决了这个问题。 – Shankar

回答

0

例如,下面的方法需要为智力参数和返回双

var toDouble: (Int) => Double = a => { 
    a.toDouble 
} 

您可以使用toDouble(2)并返回2.0

,你可以把你方法如下功能的文字一样。

val convert: (Int) => Tuple2[ImmutableBytesWritable,Put] = a => { 
       val p = new Put(Bytes.toBytes(a)) 
       p.add(Bytes.toBytes("columnfamily"), 
       Bytes.toBytes("col_1"), Bytes.toBytes(a)) 
       new Tuple2[ImmutableBytesWritable,Put](new ImmutableBytesWritable(a.toString.getBytes()), p); 
     } 
+0

向下选民,来吧...添加一些评论.. – Shankar

+0

我不知道为什么有人downvote这一点。它已经为我删除了错误。 –

2

你在这里做错了什么是定义convert里面main 如果你用这种方式写这段代码,它可能会工作:

object HBaseWrite { 
     def main(args: Array[String]) { 
     val sparkConf = new SparkConf().setAppName("HBaseWrite").setMaster("local").set("spark.driver.allowMultipleContexts","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
     val sc = new SparkContext(sparkConf) 
     val conf = HBaseConfiguration.create() 
     val outputTable = "tablename" 

     System.setProperty("user.name", "hdfs") 
     System.setProperty("HADOOP_USER_NAME", "hdfs") 
     conf.set("hbase.master", "localhost:60000") 
     conf.setInt("timeout", 120000) 
     conf.set("hbase.zookeeper.quorum", "localhost") 
     conf.set("zookeeper.znode.parent", "/hbase-unsecure") 
     conf.setInt("hbase.client.scanner.caching", 10000) 
     sparkConf.registerKryoClasses(Array(classOf[org.apache.hadoop.hbase.client.Result])) 
     val jobConfig: JobConf = new JobConf(conf,this.getClass) 
     jobConfig.setOutputFormat(classOf[TableOutputFormat]) 
     jobConfig.set(TableOutputFormat.OUTPUT_TABLE,outputTable) 
     val x = 12 
     val y = 15 
     val z = 25 
     var newarray = Array(x,y,z) 
     val newrddtohbase = sc.parallelize(newarray) 
     val convertFunc = convert _ 
     new PairRDDFunctions(newrddtohbase.map(convertFunc)).saveAsHadoopDataset(jobConfig) 
     sc.stop() 
     } 
     def convert(a:Int) : Tuple2[ImmutableBytesWritable,Put] = { 
       val p = new Put(Bytes.toBytes(a)) 
       p.add(Bytes.toBytes("columnfamily"), 
       Bytes.toBytes("col_1"), Bytes.toBytes(a)) 
       new Tuple2[ImmutableBytesWritable,Put](new ImmutableBytesWritable(a.toString.getBytes()), p); 
     } 
    } 

P.S .:代码没有测试,但它应该工作!

+0

感谢您的回应,但错误仍然相同。 –

+0

能否请您粘贴错误Stackk太 –

+0

org.apache.spark.SparkException:任务不可序列 \t在org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:166) \t在org.apache。 spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:158) \t at org.apache.spark.SparkContext.clean(SparkContext.scala:1446) \t at org.apache.spark.rdd.RDD.map(RDD .scala:286) –