我是Spark,Scala和Cassandra的新手。 使用Spark我想从MySQL获取一些ID。Spark Shell:任务不可序列化
import org.apache.spark.rdd.JdbcRDD
import java.sql.{Connection, DriverManager, ResultSet}
Class.forName("com.mysql.jdbc.Driver").newInstance
import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf
val myRDD = new JdbcRDD(sc,() => DriverManager.getConnection(url,username,password) ,"select id from user limit ?, ?",1, 20, 10, r => r.getString("id")) ;
myRDD.foreach(println)
我能够看到控制台上打印的ID。
现在对于每个提取的ID,我需要对Cassandra中的表执行Sum操作。
我创建了我可以通过传递个人ID
object HelloWorld {
def sum(id : String): Unit = {
val each_spark_rdd = uplink_rdd.select("number").where("id=?",Id).as((c: Int) => c).sum
println(each_spark_rdd)
}
}
调用一个函数,并宣布uplink_rdd作为
val uplink_rdd = sc.cassandraTable("keyspace", "table")
我能够通过将单独的ID调用该函数,并且可以看到总和
scala> HelloWorld.sum("5")
50
当我试图在每个fetc上运行相同的功能^ h id作为
myRDD.map(HelloWorld.sum)
or
myRDD.foreach(HelloWorld.sum)
or
for (id <- myRDD) HelloWorld.sum(id)
它赋予相同的例外,因为例外
org.apache.spark.SparkException:任务不能序列在 org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner。 Scala:304) at org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clea n(SparkContext.scala:2055)at org.apache.spark.rdd.RDD $$ anonfun $ foreach $ 1.apply(RDD.scala:911)at org.apache.spark.rdd.RDD $$ anonfun $ foreach $ 1.apply(RDD.scala:910)at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala :111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:54) at $ iwC $$ iwC $$ iwC $$ iwC $ $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:59) at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ IWC万国表$$ $$ iwC。(:61) at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:63) at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:65)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:67)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:69)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:71)at $ iwC $$ iwC $$:iwC $$ iwC。(:73)at $ iwC $$ iwC $$ iwC。(:75)at $ iwC $$ iwC。(:77) at $ iwC。(:79)at(:81 ): 。(:85)at。()at 。(:7)at。()at $ print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62) at sun.reflect.Delegating MethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.spark.repl.SparkIMain $ ReadEvalPrint.call(SparkIMain。斯卡拉:1065) 在 org.apache.spark.repl.SparkIMain $ Request.loadAndRun(SparkIMain.scala:1346) 在 org.apache.spark.repl.SparkIMain.loadAndRunReq $ 1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl。 SparkILoop.reallyInterpret $ 1(SparkILoop.scala:857) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala: 814)at org.apache.spark.repl.SparkILoop.processLine $ 1(SparkILoop.scala:657) at org.apache.spark.repl.SparkILoop.innerLoop $ 1(SparkILoop.scala:665) at org.apache.spark.repl.SparkILoop.org $ apache $ spark $ repl $ SparkILoop $$ loop(SparkILoop.scala :670) 在 org.apache.spark.repl.SparkILoop $$ anonfun $ $组织阿帕奇$火花$ REPL $ SparkILoop $$过程$ 1.适用$ MCZ $ SP(SparkILoop.scala:997) 在 组织。 apache.spark.repl.SparkILoop $$ anonfun $ org $ apache $ spark $ repl $ SparkILoop $$进程$ 1.apply(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop $$ anonfun $ org $ apache $ spark $ repl $ SparkILoop $$进程$ 1.apply(SparkILoop.scala:945) at scala.tools.nsc.util.ScalaClassLoader $ .savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org $ apache $ spark $ repl $ Sparkiloop $$进程(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) at org.apache.spark.repl.Main $ .main(Main.scala:31)at org.apache.spark.repl.Main.main(Main.scala)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native方法)在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在java.lang.reflect.Method.invoke(Method.java :498)at org.apache.spark.deploy.SparkSubmit $ .org $ apache $ spark $ deploy $ SparkSubmit $$ runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit $ .doRunMain $ 1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit $ .submit(SparkSubmit.scala:206) at org.apache。 spark.deploy.SparkSubmit $ .main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)引发 作者:java.io.NotSerializableException:org.apache.spark.SparkConf
我尝试添加到@Transient RDDS看完Apache Spark: "SparkException: Task not serializable" in spark-shell for RDD constructed manually为
@transient val myRDD = new JdbcRDD ...
@transient val uplink_rdd = sc.cassandra....
但仍然摹同样的错误。
请让我知道如何从Cassandara表中找到从Mysql中获取的每个id的总和。
的问题是,你基本上是试图进行转型中的作用 - 在星火变革和行动不能嵌套。当你调用'foreach'时,Spark试图序列化'HelloWorld.sum'将它传递给每个执行者 - 但是为了这样做,它也必须序列化函数的闭包,其中包括'uplink_rdd'(并且这不是可序列化的)。 但是,当你发现自己试图做这种事情时,通常只是表明你想要使用“join”或类似的东西。 – Alec
你检查了这个吗? [链接](https://stackoverflow.com/questions/32661018/scala-spark-task-not-serializable?rq=1) – Cfuentes