2016-10-25 152 views
0

我有一些json数据,它们是以int作为键和int作为值列表的键值对。我想将这些数据读入地图,然后播放它,以便其他RDD可以使用它进行快速查找。无法从Spark 2.0.1中的数据集/数据帧收集数据;得到ClassCastException

我有一个工作在数据中心1.6.1 Spark集群的代码,但在AWS中的2.0.1 Spark集群中无法使用相同的代码。该1.6.1代码工作:

import scala.collection.mutable.WrappedArray 
sc.broadcast(sqlContext.read.schema(mySchema).json(myPath).map(r => (r.getInt(0), r.getAs[WrappedArray[Int]].toArray)).collectAsMap) 

我为2.0.1我曾尝试:

val myData = sqlContext.read.schema(mySchema).json(myPath).map(r => (r.getInt(0), r.getSeq[Int].toArray)) 

这让我什么,我想在这一点上:

org.apache.spark.sql.Dataset[(Int, Array[Int])] = [_1: int, _2: array<int>] 

但随后当我这样做时:

sc.broadcast(myData.rdd.collectAsMap) 

我得到:

java.lang.ClassCastException:无法将scala.collection.immutable.List的实例分配给$ SerializationProxy字段org.apache.spark.rdd.RDD.org $ apache $ spark $ rdd $ RDD $$ dependencies_类型scala .collection.Seq在org.apache.spark.rdd.MapPartitionsRDD的实例

有谁知道我怎么能在2.0.1中做到这一点?这是我想要做的一件非常简单的事情。

由于提前,

罗宾

+0

嘿,我也是刚发布了同样的问题..帮助需要的情况下,你找到一个..... https://stackoverflow.com/questions/40233215/unable-to-read-and-later- query-text-file-in-apache-spark – Tanny

回答

0

我想通了,我的问题是火花壳2.0.1。我发布的代码工作正常,如果我使用shell创建的spark会话的一部分的现有sc和sqlContext。如果我使用自定义配置调用stop并创建一个新的会话,我将会得到上面的奇怪错误。我不喜欢这个,因为我想更改spark.driver.maxResultSize。

无论如何,教训是:如果您使用spark shell测试代码,请使用现有会话,否则它可能无法工作。

+0

您应该能够停止会话并创建一个新会话。它可能是某种竞争条件吗?你刚刚停止旧的会议后开始新的会议吗? –