2017-05-09 32 views
0

我试图在分区中添加分区索引和rownumber给rdd,我做到了。但是当我试图获得最后一个rownumber的值时,我得到零,rownumber数组似乎没有任何变化。变量范围问题?rowNumber()over(partition_index)在spark-shell中使用mapPartitionsWithIndex

它就像rowNumber()/ count()over(partition_index),但rownumber与分区索引一起在一个循环中添加,因此可能更有效?

这里谈到的代码:

scala> val rdd1 = sc.makeRDD(100 to 110) 
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[32] at makeRDD at <console>:25 

scala> val rownums=new Array[Int](3) 
rownums: Array[Int] = Array(0, 0, 0) 

scala> val rdd2=rdd1.repartition(3).mapPartitionsWithIndex((idx, itr) => itr.map(r => (idx, {rownums(idx)+=1;rownums(idx)}, r))) 
rdd2: org.apache.spark.rdd.RDD[(Int, Int, Int)] = MapPartitionsRDD[37] at mapPartitionsWithIndex at <console>:29 

scala> rdd2.collect.foreach(println) 
(0,1,100) 
(0,2,107) 
(0,3,104) 
(0,4,105) 
(0,5,106) 
(0,6,110) 
(1,1,102) 
(1,2,108) 
(1,3,103) 
(2,1,101) 
(2,2,109) 

scala> //uneffected?? 

scala> rownums.foreach(println) 
0 
0 
0 

scala> rownums 
res20: Array[Int] = Array(0, 0, 0) 

我期待(6,3,2),用于rownums :(


解决使用蓄电池:

scala> import org.apache.spark.util._ 
import org.apache.spark.util._ 

scala> val rownums=new Array[LongAccumulator](3) 
rownums: Array[org.apache.spark.util.LongAccumulator] = Array(null, null, null) 

scala> for(i <- 0 until rownums.length){rownums(i)=sc.longAccumulator("rownum_"+i)} 

scala> val rdd1 = sc.makeRDD(100 to 110) 
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[92] at makeRDD at <console>:124 

scala> val rownums2=new Array[Int](3) 
rownums2: Array[Int] = Array(0, 0, 0) 

scala> val rdd2=rdd1.repartition(3).mapPartitionsWithIndex((idx, itr) => itr.map(r => (idx, {rownums2(idx)+=1;rownums(idx).add(1);rownums2(idx)}, r))) 
rdd2: org.apache.spark.rdd.RDD[(Int, Int, Int)] = MapPartitionsRDD[97] at mapPartitionsWithIndex at <console>:130 

scala> rdd2.collect.foreach(println) 
(0,1,107)                  
(0,2,106) 
(0,3,105) 
(0,4,110) 
(0,5,104) 
(0,6,100) 
(1,1,102) 
(1,2,103) 
(1,3,108) 
(2,1,109) 
(2,2,101) 

scala> rownums.foreach(x=>println(x.value)) 
6 
3 
2 

scala> 
+0

你想做什么,得到每个分区的行数? – puhlen

+0

我正在尝试将row_number添加到rdd分区的每一行,并按顺序获取行数。累加器解决了我的问题。 – myeyre

回答

0

请从编程指南中读取Understanding closures

在执行之前,Spark计算任务的关闭。闭包是那些执行程序在RDD上执行其计算(在本例中为foreach())时必须可见的变量和方法。该封闭序列化并发送给每个执行者。

发送给每个执行程序的闭包中的变量现在是副本,因此,当在foreach函数中引用计数器时,它不再是驱动程序节点上的计数器。驱动程序节点的内存中仍有一个计数器,但执行程序对此不再可见!执行者只能看到序列化闭包的副本。因此,计数器的最终值仍然为零,因为计数器上的所有操作都引用了序列化闭包内的值。

您正在修改变量的本地副本,而不是原始变量。

+0

是否可以从执行程序到驱动程序检索本地副本? – myeyre

+0

感谢您的链接,将检查累加器。 – myeyre

0

Spark在分布式系统中运行。这意味着您无权修改函数外的元素。

如果你想获得一个数组的每个分区的计数,你需要将你的RDD转换为RDD[Int],其中每行是分区的计数,然后收集它。

rdd.mapPartitions(itr => Iterator(itr.size)) 

如果分区索引是很重要的,你可以创建和RDD[Int,Int]给行数包括它一起。

rdd.mapPartitionsWithIndex((idx, itr) => Iterator((idx, itr.size)))