2015-03-19 118 views
1

请看看下面的火花流用Scala编写代码:星火流 - 问题与传递参数

object HBase { 
    var hbaseTable = "" 
    val hConf = new HBaseConfiguration() 
    hConf.set("hbase.zookeeper.quorum", "zookeeperhost") 

    def init(input: (String)) { 
    hbaseTable = input 
    } 
    def display() { 
    print(hbaseTable) 
    } 
    def insertHbase(row: (String)) { 
    val hTable = new HTable(hConf,hbaseTable) 
    } 
} 

object mainHbase { 
    def main(args : Array[String]) { 
    if (args.length < 5) { 
     System.err.println("Usage: MetricAggregatorHBase <zkQuorum> <group> <topics> <numThreads> <hbaseTable>") 
     System.exit(1) 
    } 
    val Array(zkQuorum, group, topics, numThreads, hbaseTable) = args 
    HBase.init(hbaseTable) 
    HBase.display() 
    val sparkConf = new SparkConf().setAppName("mainHbase") 
    val ssc = new StreamingContext(sparkConf, Seconds(10)) 
    ssc.checkpoint("checkpoint") 
    val topicpMap = topics.split(",").map((_, numThreads.toInt)).toMap 
    val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) 
    val storeStg = lines.foreachRDD(rdd => rdd.foreach(HBase.insertHbase)) 
    lines.print() 
    ssc.start() 
    } 
} 

我试图通过调用HBase.init方法来初始化的对象HBase参数hbaseTable。它正确设置参数。我确认通过在下一行中调用HBase.display方法。

但是,当调用HBase.insertHbase方法中的foreachRDD被调用时,其抛出错误hbaseTable未设置。

更新,出现异常:

java.lang.IllegalArgumentException: Table qualifier must not be empty 
     org.apache.hadoop.hbase.TableName.isLegalTableQualifierName(TableName.java:179) 
     org.apache.hadoop.hbase.TableName.isLegalTableQualifierName(TableName.java:149) 
     org.apache.hadoop.hbase.TableName.<init>(TableName.java:303) 
     org.apache.hadoop.hbase.TableName.createTableNameIfNecessary(TableName.java:339) 
     org.apache.hadoop.hbase.TableName.valueOf(TableName.java:426) 
     org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:156) 

能否请你让我知道如何使此代码的工作。

+0

请用引发的确切异常来更新您的问题。 – lambdas 2015-03-19 05:24:27

+0

@lambdas更新了抛出的异常。 – vijay 2015-03-19 05:48:16

回答

2

“这段代码在哪里运行” - 这是我们为了了解正在发生的事情而需要问的问题。

HBase是一个Scala对象 - 根据定义,它是一个单例构造,它在JVM中以'只有一次'语义进行初始化。

在初始化点,HBase.init(hbaseTable)在该Spark应用程序的驱动程序中执行,使用驱动程序的VM中的给定值初始化此对象。

但是,当我们执行:rdd.foreach(HBase.insertHbase)时,闭包将作为每个执行器上的任务执行,该执行器承载给定RDD的分区。此时,对每个执行程序在每个VM上初始化对象HBase。正如我们所看到的那样,在这个对象上没有发生初始化。

这里有两种选择:

我们可以添加一些检查“将IsInitialized”到HBase对象,并新增-now conditional-调用每次调用初始化foreach。 另一种选择是使用

rdd.foreachPartitition{partition => 
    HBase.initialize(...) 
    partition.foreach(elem => HBase.insert(elem)) 
} 

这种结构将通过元件的每个分区中的量分期偿还任何初始化。也可以将它与初始化检查结合起来以防止不必要的引导工作。

+0

非常感谢。这对我有效。 – vijay 2015-03-20 10:10:43