2017-04-25 42 views
0

我想使用位于用户设计的函数内的另一个类中的方法,但它不起作用。在UDF函数中使用方法Spark Scala

我有一个方法:

def traitementDataFrameEleve(sc:SparkSession, dfRedis:DataFrame, domainMail:String, dir:String):Boolean ={ 
    def loginUDF = udf((sn: String, givenName:String) => { 
      LoginClass.GenerateloginPersone(sn,givenName,dfr) 
      }) 

    dfEleve.withColumn("ENTPersonLogin",loginUDF(dfEleve("sn"),dfEleve("givenName"))) 
} 

LoginClass是包含GenerateloginPersone方法的类。

输出错误:

org.apache.spark.SparkException: Failed to execute user defined function(anonfun$loginUDF$1$1: (string, string) => string) 
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) 
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) 
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
    at java.lang.Thread.run(Unknown Source) 
Caused by: java.lang.NullPointerException 
    at org.apache.spark.sql.Dataset.schema(Dataset.scala:410) 
    at org.apache.spark.sql.Dataset.printSchema(Dataset.scala:419) 
    at IntegrationDonneesENTLea_V1_AcBordeaux.LoginClass$.GenerateloginPersone(LoginClass.scala:16) 
    at IntegrationDonneesENTLea_V1_AcBordeaux.Eleve$$anonfun$loginUDF$1$1.apply(Eleve.scala:25) 
    at IntegrationDonneesENTLea_V1_AcBordeaux.Eleve$$anonfun$loginUDF$1$1.apply(Eleve.scala:23) 
    ... 16 more 

谢谢。

+0

“但不工作” - 什么不行?这是如何失败的(编译错误?运行时异常?意外输出?)。请** [edit](http://stackoverflow.com/posts/43616403/edit)问题**以包含所有必要的信息。 –

+0

我已更新我的帖子@TzachZohar –

+0

什么是dfr?你的意思是dfRedis? –

回答

2

不允许访问:

  • 分布式的数据结构(如DatasetRDD)。
  • SparkConext/SparkSession

从火花任务(变换,udf应用程序)。这就是你获得NPE的原因。

+0

它不存在其他方式来做到这一点? –

+1

@MounirHamdane做_what_?你可以从UDF中调用一个方法(根据你的帖子标题),这里的问题是你的方法的_contents_ - GenerateloginPersone似乎调用Dataset.printSchema - 这是你不能做的事情。由于我们无法看到'GenerateloginPersone'的实现,所以我们不能提供有关如何实现或应该实现的更多信息 - 我们不知道它的功能。所以,总结一下:是的,您可以从UDF内部调用方法;但是这些方法不能尝试使用'SparkContext' /'SparkSession'或者任何使用这些方法的东西(比如'Dataset' /'RDD')。 –

+0

是的,我可以在一个方法中使用,但不能传递一个数据框参数 –