2017-01-21 46 views
2

我正在使用HashPartioner但得到意外的结果。 我使用3个不同的字符串作为键,并给分区参数为3,所以我期望3个分区。Spark HashPartitioner意外分区

val cars = Array("Honda", "Toyota", "Kia") 

val carnamePrice = sc.parallelize(for { 
x <- cars 
y <- Array(100,200,300) 
} yield (x, y), 8) 
val rddEachCar = carnamePrice.partitionBy(new HashPartitioner(3)) 
val mapped = rddEachCar.mapPartitionsWithIndex{ 
       (index, iterator) => { 
        println("Called in Partition -> " + index) 
        val myList = iterator.toList 

        myList.map(x => x + " -> " + index).iterator 
       } 
      } 
mapped.take(10) 

结果如下。它只提供2个分区。我检查了字符串 (69909220 75427 -1783892706)的哈希码。这里有什么问题?可能我误解了分区算法。

Array[String] = Array((Toyota,100) -> 0, (Toyota,200) -> 0, (Toyota,300) -> 0, (Honda,100) -> 1, (Honda,200) -> 1, (Honda,300) -> 1, (Kia,100) -> 1, (Kia,200) -> 1, (Kia,300) -> 1) 

回答

2

这里没有什么奇怪的。 Utils.nonNegativeMod,用于通过HashPartitioner实现如下:

def nonNegativeMod(x: Int, mod: Int): Int = { 
    val rawMod = x % mod 
    rawMod + (if (rawMod < 0) mod else 0) 
} 

有三个分区密钥分发的定义如下图所示:

for { car <- Seq("Honda", "Toyota", "Kia") } 
    yield (car -> nonNegativeMod(car.hashCode, 3)) 
Seq[(String, Int)] = List((Honda,1), (Toyota,0), (Kia,1)) 

这正是你在你的情况下获得。换句话说,缺少直接的散列冲突并不能保证模块间没有任何数量的冲突。