2017-04-11 26 views
0

我不明白下面的代码有什么问题。这工作正常,并且hashmap typeMap得到更新,如果我的输入数据框未分区。但是,如果下面的代码在分区环境中执行,则typeMap始终为空且不会更新。这段代码有什么问题?感谢你的帮助。斯卡拉hashmap没有被追加

var typeMap = new mutable.HashMap[String, (String, Array[String])] 
case class Combiner(,,,,,,, mapTypes: mutable.HashMap[String, (String, Array[String])]) { 
    def execute() { 
     <...> 
     val combinersResult = dfInput.rdd.aggregate(combiners.toArray) (incrementCount, mergeCount) 
    } 

    def updateTypes(arr: Array[String], tempMapTypes:mutable.HashMap[String, (String, Array[String])]): Unit = { 
     <...> 
     typeMap ++= tempMapTypes 
    } 

    def incrementCount(combiners: Array[Combiner], row: Row): Array[Combiner] = { 
     for (i <- 0 until row.length) { 
      val array = getMyType(row(i), tempMapTypes) 
      combiners(i). updateTypes(array, tempMapTypes) 
     } 
     combiners 
} 
+0

嗨,有什么线索吗? – Garipaso

回答

2

在分布式计算中使用可变值是一个非常糟糕的主意。特别是使用Spark时,RDD操作从驱动程序交付给执行程序,并在集群中的所有不同机器上并行执行。对您的mutable.HashMap所做的更新永远不会发回给驱动程序,所以您首先遇到了在驱动程序中构建的空白地图。

因此,您需要完全重新考虑您的数据结构,因为它们更喜欢不变性,并且要记住,在执行程序上执行的操作是独立且平行的。

+0

非常感谢回复,我现在明白了,但在这里继续的理想方式是什么?我需要更新的地图进行进一步处理。 – Garipaso

+0

如果您只是在您对RDD进行操作时创建带有附加值的新地图,则不需要将地图设置为可变,但是如果您必须使用可变地图,则至少需要在操作中对其进行初始化,以便每台机器都有其地图自己的副本。 – Vidya

+0

再次感谢,我已经尝试过,更新每个分区中的地图,然后使用combine()从所有分区追加地图,但结果耗时,有没有更简单的方法来实现这一点? – Garipaso