2015-06-20 40 views
18

我是Apache Spark的新手,正在学习基本功能。 有一个小小的怀疑。假设我有一个元组的RDD(键,值),并希望从它们中获得一些独特的元组。我使用distinct()函数。我想知道函数在什么基础上认为元组是不同的..?它是基于键或值还是两者?Distinct()函数在Spark中如何工作?

回答

8

RDD.distinct() API文档只提供一个一句话描述:

“返回一个新的RDD包含此RDD的不同的元素。”

根据最近的经验,我可以告诉你,在一个元组-RDD中,元组作为一个整体被考虑。

如果你想不同的键或不同的值,然后根据你想要完成什么,你可以:

A.呼叫groupByKey()改造{(k1,v11),(k1,v12),(k2,v21),(k2,v22)}{(k1,[v11,v12]), (k2,[v21,v22])};或

B.通过调用keys()values()其次distinct()

去掉无论是键或值到本文截稿时(2015年6月),加州大学伯克利分校+ EDX运行免费的在线课程Introduction to Big Data and Apache Spark这将在提供手练习这些功能。

+0

保罗嗨!假设我们有一个RDD元组如下: (1,20),(1,21),(1,20),(2,20),(2,22),(2,20),( 3,21),(3,22)..等, 在这里你可以观察到键和值都在各种元组中重复。 因此,如果我在上述RDD上应用distinct(),结果是什么? 请花点时间。谢谢! 而且,是的,我在线上课! :) –

+2

我现在没时间了,但是你可以用'myRDD = sc.parallelize([(1,20),(1,21),(1,20),(2,20) ),(2,22),(2,20),(3,21),(3,22)]);'这甚至可以在Spark课程的其中一个实验室笔记本中工作。然后运行'myRDD.distinct()。collect()来测试输出' – Paul

4

distinct使用hashCodeequals方法确定对象。元组内置了平等机制,将其分解为每个对象的平等和位置。因此,distinct将对整个Tuple2对象起作用。正如Paul指出的那样,您可以拨打keysvalues,然后拨打distinct。或者你可以通过aggregateByKey编写自己独特的值,这将保持密钥配对。或者如果你想要不同的密钥,那么你可以使用常规的aggregate

+0

谢谢!说得通。 –

22

.distinct()肯定是在跨分区进行洗牌。要查看更多情况,请在RDD上运行.toDebugString。

val hashPart = new HashPartitioner(<number of partitions>) 

val myRDDPreStep = <load some RDD> 

val myRDD = myRDDPreStep.distinct.partitionBy(hashPart).setName("myRDD").persist(StorageLevel.MEMORY_AND_DISK_SER) 
myRDD.checkpoint 
println(myRDD.toDebugString) 

其中,用于RDD例如我有(myRDDPreStep已经由密钥散列分区,由StorageLevel.MEMORY_AND_DISK_SER依然存在,检查点),则返回:

(2568) myRDD ShuffledRDD[11] at partitionBy at mycode.scala:223 [Disk Memory Serialized 1x Replicated] 
+-(2568) MapPartitionsRDD[10] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated] 
    | ShuffledRDD[9] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated] 
    +-(2568) MapPartitionsRDD[8] at distinct at mycode.scala:223 [Disk Memory Serialized 1x Replicated] 
     | myRDDPreStep ShuffledRDD[6] at partitionBy at mycode.scala:193 [Disk Memory Serialized 1x Replicated] 
     |  CachedPartitions: 2568; MemorySize: 362.4 GB; TachyonSize: 0.0 B; DiskSize: 0.0 B 
     | myRDD[7] at count at mycode.scala:214 [Disk Memory Serialized 1x Replicated] 

注意,有可能更有效特别是如果您的RDD已经以智能方式进行了分区并且分区不会过度倾斜,那么您可以采取各种方法来减少混洗。

Is there a way to rewrite Spark RDD distinct to use mapPartitions instead of distinct?Apache Spark: What is the equivalent implementation of RDD.groupByKey() using RDD.aggregateByKey()?

1

它看起来像distinct将摆脱(键,值)重复。

在下面的示例(1,20)和(2,20)在myRDD中重复两次,但在distinct()之后,删除了重复项。

scala> val myRDD = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22))) 
myRDD: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[1274] at parallelize at <console>:22 

scala> myRDD.collect().foreach(println _) 
(1,20) 
(1,21) 
(1,20) 
(2,20) 
(2,22) 
(2,20) 
(3,21) 
(3,22) 

scala> myRDD.distinct.collect().foreach(println _) 
(2,22) 
(1,20) 
(3,22) 
(2,20) 
(1,21) 
(3,21) 
6

Justin Pihony是对的.Distinct使用hashCode和equals方法确定对象。它的返回不同元素(对象)

val rdd = sc.parallelize(List((1,20), (1,21), (1,20), (2,20), (2,22), (2,20), (3,21), (3,22))) 

鲜明

rdd.distinct.collect().foreach(println) 
(2,22) 
(1,20) 
(3,22) 
(2,20) 
(1,21) 
(3,21) 

如果你想申请的关键区别。 在这种情况下减少是更好的选择

ReduceBy

val reduceRDD= rdd.map(tup => 
    (tup._1, tup)).reduceByKey { case (a, b) => a }.map(_._2) 

reduceRDD.collect().foreach(println) 

输出: -

(2,20) 
(1,20) 
(3,21)