0
我有这段代码,它在独立工作时很好,但在AWS上的4个从属集群(8个内存30Go内存)上工作速度很慢。集群上的Spark作业比单独的更慢
For a file of 10000 entries
Standalone : 257s
Aws 4S : 369s
def tabHash(nb:Int, dim:Int) = {
var tabHash0 = Array(Array(0.0)).tail
for(ind <- 0 to nb-1) {
var vechash1 = Array(0.0).tail
for(ind <- 0 to dim-1) {
val nG = Random.nextGaussian
vechash1 = vechash1 :+ nG
}
tabHash0 = tabHash0 :+ vechash1
}
tabHash0
}
def hashmin3(x:Vector, w:Double, b:Double, tabHash1:Array[Array[Double]]) = {
var tabHash0 = Array(0.0).tail
val x1 = x.toArray
for(ind <- 0 to tabHash1.size-1) {
var sum = 0.0
for(ind2 <- 0 to x1.size-1) {
sum = sum + (x1(ind2)*tabHash1(ind)(ind2))
}
tabHash0 = tabHash0 :+ (sum+b)/w
}
tabHash0
}
def pow2(tab1:Array[Double], tab2:Array[Double]) = {
var sum = 0.0
for(ind <- 0 to tab1.size-1) {
sum = sum - Math.pow(tab1(ind)-tab2(ind),2)
}
sum
}
val w = ww
val b = Random.nextDouble * w
val tabHash2 = tabHash(nbseg,dim)
var rdd_0 = parsedData.map(x => (x.get_id,(x.get_vector,hashmin3(x.get_vector,w,b,tabHash2)))).cache
var rdd_Yet = rdd_0
for(ind <- 1 to maxIterForYstar ) {
var rdd_dist = rdd_Yet.cartesian(rdd_0).flatMap{ case (x,y) => Some((x._1,(y._2._1,pow2(x._2._2,y._2._2))))}//.coalesce(64)
var rdd_knn = rdd_dist.topByKey(k)(Ordering[(Double)].on(x=>x._2))
var rdd_bary = rdd_knn.map(x=> (x._1,Vectors.dense(bary(x._2,k))))
rdd_Yet = rdd_bary.map(x=>(x._1,(x._2,hashmin3(x._2,w,b,tabHash2))))
}
我试图播放一些变量
val w = sc.broadcast(ww.toDouble)
val b = sc.broadcast(Random.nextDouble * ww)
val tabHash2 = sc.broadcast(tabHash(nbseg,dim))
没有任何影响
我知道这不是巴里功能,因为我想这个代码的另一个版本,而不hashmin3与4个奴隶工作正常更糟糕的是8个奴隶是另一个话题。
“对于10000个条目的文件”这是一个小数据集。分配它的开销可能大于parallleliism的储蓄 –
我将自己的数据集制作成笛卡尔积,因此认为我有足够的数据。 – KyBe