2017-02-27 312 views
2

从数据帧Wrting到HBase的(MapRDB)我试图写在MAPR平台(5.2.0)在星火2.0 CSV文件到HBase的表。在星火2

我的程序如下:

import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName} 
import org.apache.hadoop.hbase.client.{HBaseAdmin, Put} 
import org.apache.hadoop.hbase.io.ImmutableBytesWritable 
import org.apache.hadoop.hbase.mapred.TableOutputFormat 
import org.apache.hadoop.hbase.util.Bytes 
import org.apache.hadoop.mapred.JobConf 
import org.apache.spark.sql.types.{StringType, StructField, StructType} 
import org.apache.spark.{SparkConf, SparkContext} 

/** 
    * Created by Mehdi on 2/25/2017. 
*/ 

object CSVLoader { 

    def main(args: Array[String]) { 

    val hbaseConfig = HBaseConfiguration.create() 
    val admin = new HBaseAdmin(hbaseConfig) 
    val tableName = "/tmp/TableCount" 


    val convertToPut= (row: org.apache.spark.sql.Row) => { 
     // create a composite row key: sensorid_date time 

     val rowkey = (row.getString(0)+row.getString(1)+row.getString(2)+row.getString(3)).hashCode() 
     val put = new Put(Bytes.toBytes(rowkey)) 
     val cfName=Bytes.toBytes("Column_Familly_1") 
     // add to column family data, column data values to put object 
     put.addColumn(cfName, Bytes.toBytes("servedIMSIng"), Bytes.toBytes(row.getString(0))) 
     put.addColumn(cfName, Bytes.toBytes("ggsnIPAddress"), Bytes.toBytes(row.getString(1))) 
     put.addColumn(cfName, Bytes.toBytes("chargingID"), Bytes.toBytes(row.getString(2))) 
     put.addColumn(cfName, Bytes.toBytes("sgsnIPAddress"), Bytes.toBytes(row.getString(3))) 
     (new ImmutableBytesWritable(Bytes.toBytes(rowkey)), put) 
    } 

    import org.apache.spark.sql.SparkSession 

    val spark = SparkSession 
     .builder() 
     .appName("Spark SQL basic example") 
     //.config("spark.some.config.option", "some-value") 
     .getOrCreate() 

    // For implicit conversions like converting RDDs to DataFrames 
    import spark.implicits._ 


    //val peopleRDD = sc.textFile("examples/src/main/resources/people.txt") 

    // The schema is encoded in a string 
    val schemaString = "servedIMSI,ggsnIPAddress,chargingID,sgsnIPAddress" 

    // Generate the schema based on the string of schema 
    val fields = schemaString.split(",") 
     .map(fieldName => StructField(fieldName, StringType, nullable = true)) 
    val schema = StructType(fields) 

    val df = spark.read 
     .schema(schema) 
     .option("header", "true") 
     .csv("files/Sample_CSV.csv") 
    // .select("servedIMSI","ggsnIPAddress","chargingID","sgsnIPAddress") 
    // .option("mode", "DROPMALFORMED") 

    df.show() 

    // set up Hadoop HBase configuration using TableOutputFormat 

    hbaseConfig.set(TableOutputFormat.OUTPUT_TABLE, tableName) 
    val jobConfig = new JobConf(hbaseConfig, this.getClass) 
    jobConfig.setOutputFormat(classOf[TableOutputFormat]) 
    jobConfig.set(TableOutputFormat.OUTPUT_TABLE, tableName) 
    df.map(convertToPut).rdd.saveAsHadoopDataset(jobConfig) 

    } 
} 

的df.show作品在展示我的CSV文件,但它不能写入到HBase的。我收到以下错误:

not take effect. 
+----------+-------------+----------+-------------+ 
|servedIMSI|ggsnIPAddress|chargingID|sgsnIPAddress| 
+----------+-------------+----------+-------------+ 
|  sd|   as|  rt|   dd| 
|  mahdi|   red|   d|   dd| 
+----------+-------------+----------+-------------+ 

Exception in thread "main" scala.reflect.internal.Symbols$CyclicReference: illegal cyclic reference involving object InterfaceStability 
     at scala.reflect.internal.Symbols$Symbol$$anonfun$info$3.apply(Symbols.scala:1502) 
     at scala.reflect.internal.Symbols$Symbol$$anonfun$info$3.apply(Symbols.scala:1500) 
     at scala.Function0$class.apply$mcV$sp(Function0.scala:34) 
     at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) 
     at scala.reflect.internal.Symbols$Symbol.lock(Symbols.scala:546) 
     at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1500) 
     at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$2.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$info(SynchronizedSymbols.scala:171) 
     at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127) 
     at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127) 
     at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19) 
     at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16) 
     at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:123) 
     at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$2.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:171) 
     at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.info(SynchronizedSymbols.scala:127) 
     at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$2.info(SynchronizedSymbols.scala:171) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$coreLookup$1(JavaMirrors.scala:992) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$lookupClass$1(JavaMirrors.scala:998) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$classToScala1(JavaMirrors.scala:1003) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToScala$1.apply(JavaMirrors.scala:980) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToScala$1.apply(JavaMirrors.scala:980) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$toScala$1.apply(JavaMirrors.scala:97) 
     at scala.reflect.runtime.TwoWayCaches$TwoWayCache$$anonfun$toScala$1.apply(TwoWayCaches.scala:38) 
     at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19) 
     at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16) 
     at scala.reflect.runtime.TwoWayCaches$TwoWayCache.toScala(TwoWayCaches.scala:33) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror.toScala(JavaMirrors.scala:95) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror.classToScala(JavaMirrors.scala:980) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaAnnotationProxy.<init>(JavaMirrors.scala:163) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaAnnotationProxy$.apply(JavaMirrors.scala:162) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaAnnotationProxy$.apply(JavaMirrors.scala:162) 
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
     at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
     at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) 
     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
     at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$copyAnnotations(JavaMirrors.scala:683) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter.load(JavaMirrors.scala:733) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter.complete(JavaMirrors.scala:744) 
     at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1514) 
     at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$2.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$info(SynchronizedSymbols.scala:171) 
     at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127) 
     at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127) 
     at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19) 
     at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16) 
     at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:123) 
     at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$2.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:171) 
     at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.info(SynchronizedSymbols.scala:127) 
     at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$2.info(SynchronizedSymbols.scala:171) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$coreLookup$1(JavaMirrors.scala:992) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$lookupClass$1(JavaMirrors.scala:998) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$classToScala1(JavaMirrors.scala:1003) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToScala$1.apply(JavaMirrors.scala:980) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToScala$1.apply(JavaMirrors.scala:980) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$toScala$1.apply(JavaMirrors.scala:97) 
     at scala.reflect.runtime.TwoWayCaches$TwoWayCache$$anonfun$toScala$1.apply(TwoWayCaches.scala:38) 
     at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19) 
     at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16) 
     at scala.reflect.runtime.TwoWayCaches$TwoWayCache.toScala(TwoWayCaches.scala:33) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror.toScala(JavaMirrors.scala:95) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror.classToScala(JavaMirrors.scala:980) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaAnnotationProxy.<init>(JavaMirrors.scala:163) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaAnnotationProxy$.apply(JavaMirrors.scala:162) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaAnnotationProxy$.apply(JavaMirrors.scala:162) 
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
     at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
     at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) 
     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
     at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$copyAnnotations(JavaMirrors.scala:683) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter.load(JavaMirrors.scala:733) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter.complete(JavaMirrors.scala:744) 
     at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1514) 
     at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$2.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$info(SynchronizedSymbols.scala:171) 
     at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127) 
     at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$info$1.apply(SynchronizedSymbols.scala:127) 
     at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19) 
     at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16) 
     at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:123) 
     at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$2.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:171) 
     at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.info(SynchronizedSymbols.scala:127) 
     at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$2.info(SynchronizedSymbols.scala:171) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$coreLookup$1(JavaMirrors.scala:992) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$lookupClass$1(JavaMirrors.scala:998) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$classToScala1(JavaMirrors.scala:1003) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToScala$1.apply(JavaMirrors.scala:980) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$classToScala$1.apply(JavaMirrors.scala:980) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror$$anonfun$toScala$1.apply(JavaMirrors.scala:97) 
     at scala.reflect.runtime.TwoWayCaches$TwoWayCache$$anonfun$toScala$1.apply(TwoWayCaches.scala:38) 
     at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19) 
     at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16) 
     at scala.reflect.runtime.TwoWayCaches$TwoWayCache.toScala(TwoWayCaches.scala:33) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror.toScala(JavaMirrors.scala:95) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror.classToScala(JavaMirrors.scala:980) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaAnnotationProxy.<init>(JavaMirrors.scala:163) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaAnnotationProxy$.apply(JavaMirrors.scala:162) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror$JavaAnnotationProxy$.apply(JavaMirrors.scala:162) 
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) 
     at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
     at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) 
     at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) 
     at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror.scala$reflect$runtime$JavaMirrors$JavaMirror$$copyAnnotations(JavaMirrors.scala:683) 
     at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter.load(JavaMirrors.scala:733) 
     at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$typeParams$1.apply(SynchronizedSymbols.scala:142) 
     at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anonfun$typeParams$1.apply(SynchronizedSymbols.scala:133) 
     at scala.reflect.runtime.Gil$class.gilSynchronized(Gil.scala:19) 
     at scala.reflect.runtime.JavaUniverse.gilSynchronized(JavaUniverse.scala:16) 
     at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:123) 
     at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$8.gilSynchronizedIfNotThreadsafe(SynchronizedSymbols.scala:168) 
     at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$class.typeParams(SynchronizedSymbols.scala:132) 
     at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$8.typeParams(SynchronizedSymbols.scala:168) 
     at scala.reflect.internal.Types$NoArgsTypeRef.typeParams(Types.scala:1926) 
     at scala.reflect.internal.Types$NoArgsTypeRef.isHigherKinded(Types.scala:1925) 
     at scala.reflect.internal.transform.UnCurry$class.scala$reflect$internal$transform$UnCurry$$expandAlias(UnCurry.scala:22) 
     at scala.reflect.internal.transform.UnCurry$$anon$2.apply(UnCurry.scala:26) 
     at scala.reflect.internal.transform.UnCurry$$anon$2.apply(UnCurry.scala:24) 
     at scala.collection.immutable.List.loop$1(List.scala:173) 
     at scala.collection.immutable.List.mapConserve(List.scala:189) 
     at scala.reflect.internal.tpe.TypeMaps$TypeMap.mapOver(TypeMaps.scala:115) 
     at scala.reflect.internal.transform.UnCurry$$anon$2.apply(UnCurry.scala:46) 
     at scala.reflect.internal.transform.Transforms$class.transformedType(Transforms.scala:43) 
     at scala.reflect.internal.SymbolTable.transformedType(SymbolTable.scala:16) 
     at scala.reflect.internal.Types$TypeApiImpl.erasure(Types.scala:225) 
     at scala.reflect.internal.Types$TypeApiImpl.erasure(Types.scala:218) 
     at org.apache.spark.sql.catalyst.ScalaReflection$class.getClassNameFromType(ScalaReflection.scala:779) 
     at org.apache.spark.sql.catalyst.ScalaReflection$.getClassNameFromType(ScalaReflection.scala:39) 
     at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$dataTypeFor(ScalaReflection.scala:78) 
     at org.apache.spark.sql.catalyst.ScalaReflection$.dataTypeFor(ScalaReflection.scala:63) 
     at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:53) 
     at org.apache.spark.sql.Encoders$.product(Encoders.scala:274) 
     at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:47) 
     at CSVLoader$.main(CSVLoader.scala:102) 
     at CSVLoader.main(CSVLoader.scala) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736) 
     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185) 
     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210) 
     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124) 
     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 

是writitng到HBase的时候我的代码是coorect。我使用submit和本地[2]模式运行我的应用程序。

我的SBT文件如下:

name := "Asef" 

version := "1.0" 

scalaVersion := "2.11.1" 


resolvers += "mapr" at "http://repository.mapr.com/maven/" 

libraryDependencies ++= Seq(
    "org.apache.spark" % "spark-core_2.11" % "2.0.1", 
"org.apache.spark" % "spark-sql_2.11" % "2.0.1", 
    "org.apache.hbase" % "hbase-client" % "1.1.1-mapr-1602-m7-5.2.0", 
    "org.apache.hbase" % "hbase-common" % "1.1.1-mapr-1602-m7-5.2.0", 
    "org.apache.hbase" % "hbase-server" % "1.1.1-mapr-1602-m7-5.2.0" excludeAll ExclusionRule(organization = "org.mortbay.jetty") 
) 

回答

1

我的错误是在程序:

df.rdd.map(convertToPut).saveAsHadoopDataset(jobConfig)