我有以下任务在我之前。Spark和Cassandra并行处理
用户执行spark提交命令时提供一组IP地址配置文件。
比方说,该数组看起来像这样:
val ips = Array(1,2,3,4,5)
可以有高达阵列100.000值..
对于数组的所有元素,我应该读Cassandra的数据,执行一些计算并将数据返回给Cassandra。
如果我做的:
ips.foreach(ip =>{
- read data from Casandra for specific "ip" // for each IP there is different amount of data to read (within the functions I determine start and end date for each IP)
- process it
- save it back to Cassandra})
能正常工作。
我相信,过程顺序运行;我不利用并行性。
,如果我做的另一方面:
val IPRdd = sc.parallelize(Array(1,2,3,4,5))
IPRdd.foreach(ip => {
- read data from Cassandra // I need to use spark context to make the query
-process it
save it back to Cassandra})
我得到的序列化异常,因为火花试图序列火花背景下,这是不序列化。
如何使这项工作,但仍然利用并行性。
感谢
编辑
这是execption我得到:在线程 “主要” org.apache.spark.SparkException
例外:任务不序列化 的组织。 apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1.apply(RDD.scala:919) at org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1.apply(RDD.scala:918) at org.apache.spark.rdd .RDDOperationScope $ .withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala :316) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) at com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1.apply(WibeeeBatchJob.scala:59 ) at com.e nerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1.apply(WibeeeBatchJob.scala:54) at scala.collection.IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable。在com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob.main上的ArrayOps $ ofRef.foreach(ArrayOps.scala:108) (WibeeeBatchJob.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect。NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在java.lang.reflect.Method.invoke(Method.java:498) 在org.apache。 spark.deploy.SparkSubmit $ .org $ apache $ spark $ deploy $ SparkSubmit $$ runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit $ .doRunMain $ 1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit $ .submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit $ .main(SparkSubmit.scala:121) at org.apache.spark.deploy。 SparkSubmit.main(SparkSubmit.scala) 引起:java.io.NotSerializableException:org.apache.spark.SparkContext 序列化堆栈: - object not serializable(class:org.apache.spark.SparkContext,value:[email protected]) - field(class:com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1,name:sc $ 1,type:class org.apache.spark.SparkContext) - object(class com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1,) - field(class:com。 enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1 $$ anonfun $ apply $ 1,name:$ outer,type:class com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1) - 对象(类com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1 $$ anonfun $ apply $ 1) at org.apache.spark.serializer.SerializationDebugger $ .improveException(SerializationDebugger.scala:40) 在org.apache.spark.serializer.JavaSerializationS tream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:301 )
什么是特定异常和导致该问题的具体代码? –