2016-04-26 78 views
0

我目前正在使用Spark和Scalacheck,并试图过滤RDD [(A,Long)] (其中A是从Avro文件读取的寄存器,Long是从zipWithUniqueId()函数获得)从存储在缓冲区中的相同RDD中取出。过滤来自另一个元组列表的元组的RDD

我的意图是测试该样本的一些属性,一旦失败,请在该RDD的样本中再次测试该属性,该样本不包含之前取样的任何值。 我将rdd存储在一个var中,所以我可以重新分配一次我过滤它。 我的代码是这样的:

val samplingSeed = new Random(System.currentTimeMillis()).nextLong() 
val sampled = rdd.takeSample(withReplacement = false, bufferSize, samplingSeed) 
val buffer: JQueue[(A, Long)] = new JConcurrentLinkedQueue[(A, Long)] 

//Sampled as Array converts to queue 
for (i <- 0 to sampled.length - 1) 
buffer.add(sampled(i).asInstanceOf[(A, Long)]) 

//rdd is assigned to a var for persistence 
//filter here and leave out all the tuples in buffer based in the 
//Long value in each tuple 
rdd= rdd.filter{foo} 

我怎么能做到这一点?

+1

您可以播放设定采样的ID,并在过滤器检查的ID是集:'VAL IDS = sc.broadcast( buffer.toSet.map(_._ 2)); rdd.filter(v =>!ids.value.contains(v._2))' –

+0

像魅力一样工作谢谢 – mtelloz

+0

不客气,我已经创建了答案。请接受它 –

回答

0

在一般情况下,通过设置过滤可以使用广播变量来实现:

val rdd = sc.parallelize((1 to 10).toSeq) 
val ids = sc.broadcast(Set(1, 2, 3)) 
rdd.filter(v => !ids.value.contains(v)).collect() 
res1: Array[Int] = Array(4, 5, 6, 7, 8, 9, 10)