2017-04-25 36 views
1

我有这段代码:映射一个RDD的值,以他们的字典中的值

List tmp = colRDD.collect(); 
int ctr = 0; 
for(Object o : tmp){ 
    if (!dictionary.containsKey(o)) { 
     dictionary.put(o, ctr++); 
    } 
} 
revDictionary = dictionary.entrySet().stream() 
     .collect(Collectors.toMap(Entry::getValue, c -> c.getKey())); 
colRDD = colRDD.map(x -> {return dictionary.get(x);}); 

一开始,我兑现了RDD,并把每个值在哈希表,其中RDD值是关键。 然后,我简单的想在RDD每个值映射到他们的字典值。 不过,我得到一个Task not serializable错误。这是为什么 ?

回答

3

这将通过试图访问一个变量作用域到驱动器,从内,其由执行程序代码评估引起。

鉴于你的示例代码,最有可能的罪魁祸首是在这行代码dictionary

colRDD = colRDD.map(x -> {return dictionary.get(x);}); 

但是这个问题也可以从另一个在你的代码比你在这里提供的到来,所以你可能还需要检查。

这样做的原因是因为dictionary驻留在你的驱动程序,这很可能是在一个单独的JVM实例比你的遗嘱执行人运行内存。您传递给colRDD.map的lambda由执行者评估,而非司机。该函数被序列化为要执行的任务,并发送给执行器以便运行。但是Spark引擎无法连续执行任务,因为dictionary的“关闭”,因此是例外。

+0

我知道错误来自那里。不过,为什么不这样做。 – SpiderRico

+0

增加了更多的细节来帮助理解发生的事情。 – ImDarrenG

+0

@ImDarrenG答案是正确的,但是如果每次调用地图时使用广播都不发送字典将会很好 –