2016-04-21 63 views
0

我有以下任务在我之前。Spark和Cassandra并行处理

用户执行spark提交命令时提供一组IP地址配置文件。

比方说,该数组看起来像这样:

val ips = Array(1,2,3,4,5)

可以有高达阵列100.000值..

对于数组的所有元素,我应该读Cassandra的数据,执行一些计算并将数据返回给Cassandra。

如果我做的:

ips.foreach(ip =>{ 
- read data from Casandra for specific "ip" // for each IP there is different amount of data to read (within the functions I determine start and end date for each IP) 
- process it 
- save it back to Cassandra}) 

能正常工作。

我相信,过程顺序运行;我不利用并行性。

,如果我做的另一方面:

val IPRdd = sc.parallelize(Array(1,2,3,4,5)) 
IPRdd.foreach(ip => { 
- read data from Cassandra // I need to use spark context to make the query 
-process it 
save it back to Cassandra}) 

我得到的序列化异常,因为火花试图序列火花背景下,这是不序列化。

如何使这项工作,但仍然利用并行性。

感谢

编辑

这是execption我得到:在线程 “主要” org.apache.spark.SparkException

例外:任务不序列化 的组织。 apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1.apply(RDD.scala:919) at org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1.apply(RDD.scala:918) at org.apache.spark.rdd .RDDOperationScope $ .withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala :316) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918) at com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1.apply(WibeeeBatchJob.scala:59 ) at com.e nerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1.apply(WibeeeBatchJob.scala:54) at scala.collection.IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable。在com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob.main上的ArrayOps $ ofRef.foreach(ArrayOps.scala:108) (WibeeeBatchJob.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect。NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在java.lang.reflect.Method.invoke(Method.java:498) 在org.apache。 spark.deploy.SparkSubmit $ .org $ apache $ spark $ deploy $ SparkSubmit $$ runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit $ .doRunMain $ 1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit $ .submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit $ .main(SparkSubmit.scala:121) at org.apache.spark.deploy。 SparkSubmit.main(SparkSubmit.scala) 引起:java.io.NotSerializableException:org.apache.spark.SparkContext 序列化堆栈: - object not serializable(class:org.apache.spark.SparkContext,value:[email protected]) - field(class:com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1,name:sc $ 1,type:class org.apache.spark.SparkContext) - object(class com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1,) - field(class:com。 enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1 $$ anonfun $ apply $ 1,name:$ outer,type:class com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1) - 对象(类com.enerbyte.spark.jobs.wibeeebatch.WibeeeBatchJob $$ anonfun $ main $ 1 $$ anonfun $ apply $ 1) at org.apache.spark.serializer.SerializationDebugger $ .improveException(SerializationDebugger.scala:40) 在org.apache.spark.serializer.JavaSerializationS tream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:301 )

+0

什么是特定异常和导致该问题的具体代码? –

回答

1

最简单的做法是使用Spark Cassandra Connector它可以处理连接池和序列化。

有了,你可以做类似

sc.parallelize(inputData, numTasks) 
    .mapPartitions { it => 
    val con = CassandraConnection(yourConf) 
    con.withSessionDo{ session => 
     //Use the session 
    } 
    //Do any other processing 
    }.saveToCassandra("ks","table" 

这将是一个卡桑德拉连接的完全手动操作。会话将全部自动合并并缓存,如果您准备声明,那么这些会话也会缓存在执行程序中。

如果你想使用更多的内置方法,也有joinWithCassandraTable这可能适合你的情况。

sc.parallelize(inputData, numTasks) 
    .joinWithCassandraTable("ks","table") //Retrieves all records for which input data is the primary key 
    .map(//manipulate returned results if needed) 
    .saveToCassandra("ks","table") 
+0

你不明白我的问题。 这是我是否可以在并行模式下执行100个不同的查询(查询的不同参数),以便执行者是执行查询的人(这意味着他们是需要Spark上下文实例的人)... 但我意识到spark的上下文不能(也不应该)发送给executors,所以我需要改变我的管道结构。 –

+0

我给出的例子都有执行者执行查询。 – RussS