2016-08-23 60 views
0

我是Spark,Scala和Cassandra的新手。 使用Spark我想从MySQL获取一些ID。Spark Shell:任务不可序列化

import org.apache.spark.rdd.JdbcRDD 
import java.sql.{Connection, DriverManager, ResultSet} 
Class.forName("com.mysql.jdbc.Driver").newInstance 

import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkContext._, org.apache.spark.SparkConf 

val myRDD = new JdbcRDD(sc,() => DriverManager.getConnection(url,username,password) ,"select id from user limit ?, ?",1, 20, 10, r => r.getString("id")) ; 
myRDD.foreach(println) 

我能够看到控制台上打印的ID。

现在对于每个提取的ID,我需要对Cassandra中的表执行Sum操作。

我创建了我可以通过传递个人ID

object HelloWorld { 
     def sum(id : String): Unit = { 
     val each_spark_rdd = uplink_rdd.select("number").where("id=?",Id).as((c: Int) => c).sum 
     println(each_spark_rdd) 
     } 
    } 

调用一个函数,并宣布uplink_rdd作为

val uplink_rdd = sc.cassandraTable("keyspace", "table") 

我能够通过将单独的ID调用该函数,并且可以看到总和

scala> HelloWorld.sum("5") 
50 

当我试图在每个fetc上运行相同的功能^ h id作为

myRDD.map(HelloWorld.sum) 
or 
myRDD.foreach(HelloWorld.sum) 
or 
for (id <- myRDD) HelloWorld.sum(id) 

它赋予相同的例外,因为例外

org.apache.spark.SparkException:任务不能序列在 org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner。 Scala:304) at org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clea n(SparkContext.scala:2055)at org.apache.spark.rdd.RDD $$ anonfun $ foreach $ 1.apply(RDD.scala:911)at org.apache.spark.rdd.RDD $$ anonfun $ foreach $ 1.apply(RDD.scala:910)at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala :111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)at org.apache.spark.rdd.RDD.foreach(RDD.scala:910)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:54) at $ iwC $$ iwC $$ iwC $$ iwC $ $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:59) at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ IWC万国表$$ $$ iwC。(:61) at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:63) at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:65)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:67)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:69)at $ iwC $$ iwC $$ iwC $$ iwC $$ iwC。(:71)at $ iwC $$ iwC $$:iwC $$ iwC。(:73)at $ iwC $$ iwC $$ iwC。(:75)at $ iwC $$ iwC。(:77) at $ iwC。(:79)at(:81 ): 。(:85)at。()at 。(:7)at。()at $ print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62) at sun.reflect.Delegating MethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.spark.repl.SparkIMain $ ReadEvalPrint.call(SparkIMain。斯卡拉:1065) 在 org.apache.spark.repl.SparkIMain $ Request.loadAndRun(SparkIMain.scala:1346) 在 org.apache.spark.repl.SparkIMain.loadAndRunReq $ 1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.spark.repl。 SparkILoop.reallyInterpret $ 1(SparkILoop.scala:857) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala: 814)at org.apache.spark.repl.SparkILoop.processLine $ 1(SparkILoop.scala:657) at org.apache.spark.repl.SparkILoop.innerLoop $ 1(SparkILoop.scala:665) at org.apache.spark.repl.SparkILoop.org $ apache $ spark $ repl $ SparkILoop $$ loop(SparkILoop.scala :670) 在 org.apache.spark.repl.SparkILoop $$ anonfun $ $组织阿帕奇$火花$ REPL $ SparkILoop $$过程$ 1.适用$ MCZ $ SP(SparkILoop.scala:997) 在 组织。 apache.spark.repl.SparkILoop $$ anonfun $ org $ apache $ spark $ repl $ SparkILoop $$进程$ 1.apply(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop $$ anonfun $ org $ apache $ spark $ repl $ SparkILoop $$进程$ 1.apply(SparkILoop.scala:945) at scala.tools.nsc.util.ScalaClassLoader $ .savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org $ apache $ spark $ repl $ Sparkiloop $$进程(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) at org.apache.spark.repl.Main $ .main(Main.scala:31)at org.apache.spark.repl.Main.main(Main.scala)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native方法)在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在java.lang.reflect.Method.invoke(Method.java :498)at org.apache.spark.deploy.SparkSubmit $ .org $ apache $ spark $ deploy $ SparkSubmit $$ runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit $ .doRunMain $ 1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit $ .submit(SparkSubmit.scala:206) at org.apache。 spark.deploy.SparkSubmit $ .main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)引发 作者:java.io.NotSerializableException:org.apache.spark.SparkConf

我尝试添加到@Transient RDDS看完Apache Spark: "SparkException: Task not serializable" in spark-shell for RDD constructed manually

@transient val myRDD = new JdbcRDD ... 
@transient val uplink_rdd = sc.cassandra.... 

但仍然摹同样的错误。

请让我知道如何从Cassandara表中找到从Mysql中获取的每个id的总和。

+0

的问题是,你基本上是试图进行转型中的作用 - 在星火变革和行动不能嵌套。当你调用'foreach'时,Spark试图序列化'HelloWorld.sum'将它传递给每个执行者 - 但是为了这样做,它也必须序列化函数的闭包,其中包括'uplink_rdd'(并且这不是可序列化的)。 但是,当你发现自己试图做这种事情时,通常只是表明你想要使用“join”或类似的东西。 – Alec

+0

你检查了这个吗? [链接](https://stackoverflow.com/questions/32661018/scala-spark-task-not-serializable?rq=1) – Cfuentes

回答

0

您的代码正试图在myRDD转换中使用uplink_rdd。应用于RDD的闭包不能包含另一个RDD。

你应该做一些沿着joinWithCassandraTable的行,它将并行分布(ly?)使用来自myRDD的信息从Cassandra中提取数据。这工作,如果你是从卡桑德拉

拉单分区键

the Docs

另一种选择是使用手动连接从连接器使用的池提取。

val cc = CassandraConnector(sc.getConf) 
myRDD.mapPartitions { it => 
    cc.withSessionDo { session => 
    session.execute("whatever query you want") 
    } 
} 

如果你实际上是在多个分区上求和的卡桑德拉你需要 为每个ID的新RDD。

喜欢的东西

myRDD.collect.foreach(HelloWorld.sum) 
相关问题