2017-04-20 145 views
0

我想流数据插入到hbase; 这是我的代码:火花流HBase的错误

val tableName = "streamingz" 
val conf = HBaseConfiguration.create() 
conf.addResource(new Path("file:///opt/cloudera/parcels/CDH-5.4.7-1.cdh5.4.7.p0.3/etc/hbase/conf.dist/hbase-site.xml")) 
conf.set(TableInputFormat.INPUT_TABLE, tableName) 

val admin = new HBaseAdmin(conf) 
if (!admin.isTableAvailable(tableName)) { 
    print("-----------------------------------------------------------------------------------------------------------") 
    val tableDesc = new HTableDescriptor(tableName) 
    tableDesc.addFamily(new HColumnDescriptor("z1".getBytes())) 
    tableDesc.addFamily(new HColumnDescriptor("z2".getBytes())) 
    admin.createTable(tableDesc) 
} else { 
    print("Table already exists!!--------------------------------------------------------------------------------------") 
} 
val ssc = new StreamingContext(sc, Seconds(10)) 
val topicSet = Set("fluxAstellia") 
val kafkaParams = Map[String, String]("metadata.broker.list" - > "10.32.201.90:9092") 
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicSet) 
val lines = stream.map(_._2).map(_.split(" ", -1)).foreachRDD(rdd => { 
    if (!rdd.partitions.isEmpty) { 
     val myTable = new HTable(conf, tableName) 
     rdd.map(rec => { 
      var put = new Put(rec._1.getBytes) 
      put.add("z1".getBytes(), "name".getBytes(), Bytes.toBytes(rec._2)) 
      myTable.put(put) 
     }).saveAsNewAPIHadoopDataset(conf) 
     myTable.flushCommits() 
    } else { 
     println("rdd is empty") 
    } 

}) 


ssc.start() 
ssc.awaitTermination() 

} 
} 

我得到这个错误:

:66: error: value _1 is not a member of Array[String] 
     var put = new Put(rec._1.getBytes) 

我初学者我这样怎么能不修复这个错误,我有一个问题:

哪里准确地创建表格;流媒体过程之外还是内部?

谢谢

回答

0

你的错误,基本上就行var put = new Put(rec._1.getBytes) 您可以拨打_n仅在地图(_1关键和_2值)或元组。
rec是你通过空格字符流中分割字符串得到的字符串数组。如果你是在第一个元素之后,你会把它写成var put = new Put(rec(0).getBytes)。同样,在下一行,你会写为put.add("z1".getBytes(), "name".getBytes(), Bytes.toBytes(rec(1)))

+0

什么有关创建HBase的表。我应该在哪里创建它? –

+0

我得到这个新的错误哥'错误的jobscheduler:错误运行的工作流的工作1492790490000 ms.0 org.apache.spark.SparkException:任务不serializable' –

+0

它应该告诉你的是什么类没有在堆栈跟踪序列化。不管你的map()关闭是什么,都应该是可序列化的。我的猜测是HTable不可序列化。您可以使它序列化与'VAL myTable的=新HTable(CONF,表名)与java.io.Serializable'更换线或者是将其标记为'@Transient lazy'所以每个执行人如果那是创建自己的实例你想做。 – sparker