2015-10-16 111 views
0

我正在使用Spark 1.2进行某些数据处理。我使用StructField创建了一个模式。Apache Spark SQL NumberFormatException

val exampleSchema = StructType(Array(StructField("SerialNo",StringType,true),StructField("Date",DateType,true),StructField("Value",IntegerType,true))) 

val exampleRowRDD = rawData.map(_.split(",")).map(p => Row(p(0),repairDate(p(1)), p(2).toInt)) 


val schemaRDD = sqlContext.applySchema(exampleRowRDD, exampleSchema) 

schemaRDD.registerTempTable("ExampleTable") 

现在,当我运行对表的查询,我得到这个错误:

java.lang.NumberFormatException: For input string: "" 
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) 
    at java.lang.Integer.parseInt(Integer.java:504) 
    at java.lang.Integer.parseInt(Integer.java:527) 
    at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229) 
    at scala.collection.immutable.StringOps.toInt(StringOps.scala:31) 
    at $line16.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:49) 
    at $line16.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$2.apply(<console>:49) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:122) 
    at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:112) 
    at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:249) 
    at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163) 
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:228) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) 

我知道我的数据有空值,我试图用选项,可将空值,有些,None方法,但是Spark SQL无法识别数据类型。所以底线是我的“价值”列的所有值应该是一个整数,但null不能转换为整数。 我该怎么办?工作示例代码示例将不胜感激。

回答

0

您可以创建一个返回Option[Int]的功能,而不是使用Try投掷NumberFormatException的:

import scala.util.Try 

def parseInt(s: String): Option[Int] = Try(s.toInt).toOption 

,您可以为使用:

val bad1 = "" 
val bad2 = (Int.MaxValue + 1L).toString 
val good = "5" 

parseInt(bad1) // None 
parseInt(bad2) // None 
parseInt(good) // Some(5) 

你需要去适应你的代码来处理Option[Int]当然。

  • 你可以在你的代码
  • 的休息与工作Option[Int]或提供一个默认值:parseInt(bad1).getOrElse(defaultValue)
+0

谢谢你答复彼得。然而,我已经尝试过这一点,但运行查询对火花表它会引发错误。 '线程中的异常'主要“scala.MatchError:无(类scala.None $) \t at scala.tools.nsc.typechecker.Implicits $ ImplicitSearch.scala $ tools $ nsc $ typechecker $ Implicits $ ImplicitSearch $$ typedImplicit Implicits.scala:458)' – suri

+0

你会得到哪些错误? –

+0

我有两个文件。文件1具有空值。我用你的方法从文件一创建表。比我加入了两张桌子。到目前为止好。然后我尝试将输出RDD注册为新表,并且当出现此错误时: - 'machineJoinedEvents.registerTempTable(“machineJoinedEventsTable”) 错误: 编译期间未捕获的异常:scala.MatchError scala.MatchError:None(of类的scala.None $) 该条目似乎杀死了编译器。我应该重播 你的会话?除了最后一行,我可以重新运行每行。 [y/n]' – suri