2017-08-26 67 views
0

我正在尝试处理Spark中的常见异常,如.map操作在数据的所有元素或FileNotFound异常中无法正常工作。我已阅读所有存在问题及以下两个职位:如何处理Spark和Scala中的异常

https://rcardin.github.io/big-data/apache-spark/scala/programming/2016/09/25/try-again-apache-spark.html

https://www.nicolaferraro.me/2016/02/18/exception-handling-in-apache-spark

我已经试过行内一个try语句attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble
,所以它读取attributes => Try(mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble)

但是,韩元不要编译;编译器稍后将不会识别.toDF()语句。我也尝试了类似Java的Try {Catch {}}块,但无法获得正确的范围;然后不返回df。有谁知道如何正确地做到这一点?我甚至需要处理这些异常,因为Spark框架似乎已经处理了FileNotFound异常,而我却没有添加它。但是,例如,如果输入文件的列数错误,我想用模式中的字段数生成一个错误。

下面的代码:

object DataLoadTest extends SparkSessionWrapper { 
/** Helper function to create a DataFrame from a textfile, re-used in  subsequent tests */ 
def createDataFrame(fileName: String): DataFrame = { 

import spark.implicits._ 

//try { 
val df = spark.sparkContext 
    .textFile("/path/to/file" + fileName) 
    .map(_.split("\\t")) 
//mHealth user is the case class which defines the data schema 
    .map(attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble, 
     attributes(3).toDouble, attributes(4).toDouble, 
     attributes(5).toDouble, attributes(6).toDouble, attributes(7).toDouble, 
     attributes(8).toDouble, attributes(9).toDouble, attributes(10).toDouble, 
     attributes(11).toDouble, attributes(12).toDouble, attributes(13).toDouble, 
     attributes(14).toDouble, attributes(15).toDouble, attributes(16).toDouble, 
     attributes(17).toDouble, attributes(18).toDouble, attributes(19).toDouble, 
     attributes(20).toDouble, attributes(21).toDouble, attributes(22).toDouble, 
     attributes(23).toInt)) 
    .toDF() 
    .cache() 
df 
} catch { 
    case ex: FileNotFoundException => println(s"File $fileName not found") 
    case unknown: Exception => println(s"Unknown exception: $unknown") 

} 
} 

感谢所有的建议。谢谢!

回答

2

另一种选择是使用Scala中Try类型。

例如:现在

def createDataFrame(fileName: String): Try[DataFrame] = { 

try { 
     //create dataframe df 
     Success(df) 
    } catch { 
     case ex: FileNotFoundException => { 
     println(s"File $fileName not found") 
     Failure(ex) 
     } 
     case unknown: Exception => { 
     println(s"Unknown exception: $unknown") 
     Failure(unknown) 
     } 
    } 
    } 

,在发送方,处理它喜欢:

createDataFrame("file1.csv") match { 
    case Success(df) => { 
    // proceed with your pipeline 
    } 
    case Failure(ex) => //handle exception 
} 

这比使用选项呼叫者会知道失败的原因,并能处理略胜一筹更好。

1

要么你让异常被抛出createDataFrame法(外处理它),或更改签名返回Option[DataFrame]

def createDataFrame(fileName: String): Option[DataFrame] = { 

    import spark.implicits._ 

    try { 
     val df = spark.sparkContext 
     .textFile("/path/to/file" + fileName) 
     .map(_.split("\\t")) 
     //mHealth user is the case class which defines the data schema 
     .map(attributes => mHealthUser(attributes(0).toDouble, attributes(1).toDouble, attributes(2).toDouble, 
     attributes(3).toDouble, attributes(4).toDouble, 
     attributes(5).toDouble, attributes(6).toDouble, attributes(7).toDouble, 
     attributes(8).toDouble, attributes(9).toDouble, attributes(10).toDouble, 
     attributes(11).toDouble, attributes(12).toDouble, attributes(13).toDouble, 
     attributes(14).toDouble, attributes(15).toDouble, attributes(16).toDouble, 
     attributes(17).toDouble, attributes(18).toDouble, attributes(19).toDouble, 
     attributes(20).toDouble, attributes(21).toDouble, attributes(22).toDouble, 
     attributes(23).toInt)) 
     .toDF() 
     .cache() 

     Some(df) 
    } catch { 
     case ex: FileNotFoundException => { 
     println(s"File $fileName not found") 
     None 
     } 
     case unknown: Exception => { 
     println(s"Unknown exception: $unknown") 
     None 
     } 
    } 
    } 

编辑:于createDataFrame的呼叫侧有几个图案。如果你正在处理几个文件名,你可以例如更多信息:

val dfs : Seq[DataFrame] = Seq("file1","file2","file3").map(createDataFrame).flatten 

如果你是在一个单一的文件名的工作,你可以这样做:

createDataFrame("file1.csv") match { 
    case Some(df) => { 
    // proceed with your pipeline 
    val df2 = df.filter($"activityLabel" > 0).withColumn("binaryLabel", when($"activityLabel".between(1, 3), 0).otherwise(1)) 
    } 
    case None => println("could not create dataframe") 
} 
+0

@Raphael_Roth由于该工作,但然后打破了下一行,这是期待一个数据帧,而不是一个选项[数据帧]:'VAL DF2 = DF .filter($ “activityLabel”> 0) .withColumn(“binaryLabel “,when($”activityLabel“.between(1,3),0).otherwise(1))'我已经将df2更改为Option [DataFrame],但是.filter现在不能编译。 – LucieCBurgess

+0

@LucieCBurgess阅读关于如何在scala中使用Option,或向我们展示代码如何使用'createDataFrame',你是否遍历文件名? –

+0

@Raphael_Roth:我试过这个:'val df:DataFrame = DataLoadTest.createDataFrame(fileName).getOrElse(None)'这是在一个单独的类中,所以我不会将df重新分配给val。我得到错误:“类型Serializable的表达式不能确认预期的类型sql。DataFrame“ – LucieCBurgess