2015-07-21 28 views
0

我有这段代码,它在独立工作时很好,但在AWS上的4个从属集群(8个内存30Go内存)上工作速度很慢。集群上的Spark作业比单独的更慢

For a file of 10000 entries 
Standalone : 257s 
Aws 4S : 369s 

    def tabHash(nb:Int, dim:Int) = { 

     var tabHash0 = Array(Array(0.0)).tail 

     for(ind <- 0 to nb-1) { 
      var vechash1 = Array(0.0).tail 
      for(ind <- 0 to dim-1) { 
       val nG = Random.nextGaussian 
       vechash1 = vechash1 :+ nG 
      } 
      tabHash0 = tabHash0 :+ vechash1 
     } 
     tabHash0 
    } 

    def hashmin3(x:Vector, w:Double, b:Double, tabHash1:Array[Array[Double]]) = { 

     var tabHash0 = Array(0.0).tail 
     val x1 = x.toArray 
     for(ind <- 0 to tabHash1.size-1) { 
      var sum = 0.0 
      for(ind2 <- 0 to x1.size-1) { 
       sum = sum + (x1(ind2)*tabHash1(ind)(ind2)) 
      }   
      tabHash0 = tabHash0 :+ (sum+b)/w 
     } 
     tabHash0 

    } 

    def pow2(tab1:Array[Double], tab2:Array[Double]) = { 

     var sum = 0.0 
     for(ind <- 0 to tab1.size-1) { 
      sum = sum - Math.pow(tab1(ind)-tab2(ind),2) 
     } 
     sum 
    } 


     val w = ww 
     val b = Random.nextDouble * w 
     val tabHash2 = tabHash(nbseg,dim) 

     var rdd_0 = parsedData.map(x => (x.get_id,(x.get_vector,hashmin3(x.get_vector,w,b,tabHash2)))).cache 

     var rdd_Yet = rdd_0 

     for(ind <- 1 to maxIterForYstar ) { 

      var rdd_dist = rdd_Yet.cartesian(rdd_0).flatMap{ case (x,y) => Some((x._1,(y._2._1,pow2(x._2._2,y._2._2))))}//.coalesce(64) 

      var rdd_knn = rdd_dist.topByKey(k)(Ordering[(Double)].on(x=>x._2)) 

      var rdd_bary = rdd_knn.map(x=> (x._1,Vectors.dense(bary(x._2,k)))) 

      rdd_Yet = rdd_bary.map(x=>(x._1,(x._2,hashmin3(x._2,w,b,tabHash2)))) 


     } 

我试图播放一些变量

 val w = sc.broadcast(ww.toDouble) 
     val b = sc.broadcast(Random.nextDouble * ww) 
     val tabHash2 = sc.broadcast(tabHash(nbseg,dim)) 

没有任何影响

我知道这不是巴里功能,因为我想这个代码的另一个版本,而不hashmin3与4个奴隶工作正常更糟糕的是8个奴隶是另一个话题。

+0

“对于10000个条目的文件”这是一个小数据集。分配它的开销可能大于parallleliism的储蓄 –

+0

我将自己的数据集制作成笛卡尔积,因此认为我有足够的数据。 – KyBe

回答

1

错误代码。特别适用于分布式和大型计算。我不能快速说出问题的根源,但无论如何你必须重写这段代码。

  1. 数组对于通用和可共享的数据来说是很糟糕的。它是可变的,需要连续的内存分配(即使你拥有足够的内存,最后可能会出现问题)。更好地使用Vector(或有时列出)。千万不要使用数组。
  2. var vechash1 = Array(0.0).tail您使用一个元素创建集合,然后调用函数来获取空集合。如果它很少,不用担心表演,但它很丑! var vechash1: Array[Double] = Array()var vechash1: Vector[Double] = Vector()var vechash1 = Vector.empty[Double]
  3. def tabHash(nb:Int, dim:Int) =当它不清楚时总是设置函数的返回类型。斯卡拉的力量是丰富的类型系统。编译时间检查非常有用(关于你究竟得到的结果,而不是你想象的结果!)。处理大量数据时非常重要,因为编译检查会节省您的时间和金钱。稍后阅读这些代码也更容易。 def tabHash(nb:Int, dim:Int): Vector[Vector[Double]] =
  4. def hashmin3(x: Vector,错字?它不能编译没有类型参数。

第一功能更加紧凑:

def tabHash(nb:Int, dim:Int): Vector[Vector[Double]] = { 
    (0 to nb-1).map {_ => 
    (0 to dim - 1).map(_ => Random.nextGaussian()).toVector 
    }.toVector 
} 

第二个功能是((x*M) + scalar_b)/scalar_w。使用专门针对矩阵工作而优化的库可能更有效率。

三(我在这里想错与计算的迹象,如果算上方误差):

def pow2(tab1:Vector[Double], tab2:Vector[Double]): Double = 
     tab1.zip(tab2).map{case (t1,t2) => Math.pow(t1 - t2, 2)}.reduce(_ - _) 

var rdd_Yet = rdd_0 Cached RDD is rewrited in cycle. So it's useless storage. 

最后一个循环是很难分析。我认为它必须简化。

相关问题