2015-10-05 42 views
0

我试图在以下文本文件上运行sortByKey()函数。Spark Scala中的sortByKey()函数无法正常工作

EMP_NAME EMP_ID SALARY 
Adam  22  100 
Bob  25  102 
Bob  28  104 
Chris 29  110 

我正在EMP_NAME作为下列文本文件的关键。我运行下面的命令:textFile.sortByKey() 我正在以下的输出:

Bob 
Bob 
Adam 
Chris 

帮助是appreciated..Thank你。

+3

请添加您的代码,以便于您的帮助。 – ale64bit

回答

4

如果您正在使用SparkConffiguration作为

val conf = new SparkConf().setMaster("local") 

则默认情况下创建的分区的数量为1

但是,如果你正在使用

val conf = new SparkConf().setMaster("local[*]") 

和你有额外的核心可用于Spark,它将根据它对数据进行分区,以便Spark能够并行执行任务。

要获得分区火花的数量已经作出:

println(partitions.length) 
//For my machine it was 2 

如果数据被分配,然后将分选在该分区仅,并在从每个分区输出端被合并在的元素来完成。为了避免这种情况,您可以在sortByKey方法中将numPartition强制为1,并将数据放入一个分区,然后对其进行排序。

textFile.sortByKey(numPartitions = 1).foreach(println) 

这将使分割成1和你会得到整个输入数据正确排序的输出。

+1

这个答案如此救了我的一天,谢谢! –

+0

为了获得RDD调用分区的数量'getNumPartitions()' – wbmrcb

0

在这里,我提供数据集和代码来执行按键排序的功能,如果你不觉得有帮助,那么请提供我们的代码,我们将研究这个问题。

数据 - > (制表符分隔文件)

EMP_NAME EMP_ID SALARY 
Adam 22 100 
Bob 25 102 
Bob 28 104 
Chris 29 110 

代码 - >

import org.apache.spark.SparkContext 
import org.apache.spark.SparkConf 

/* 
* @author Kshitij Kulshrestha 
*/ 

object A1 { 
def main(args: Array[String]): Unit = { 

// set up environment 
val sparkHome = "/usr/spark_pack/spark-1.4.1-bin-hadoop2.4/"; 
val sparkMasterUrl = "spark://SYSTEMX:7077"; 

val conf = new SparkConf() 
.setAppName("A1") 
.setMaster("local[2]") 
.setSparkHome(sparkHome) 

val sc = new SparkContext(conf) 

val dataRDD = sc.textFile("src/Source/A1_data").filter { !_.contains("EMP_NAME") } 
.map { x => 
{ 
val temp = x.split("\t") 

((temp(0)), (temp(1), temp(2))) 
} 
} 

val sortedDataRDD = dataRDD coalesce(1) sortByKey() 
sortedDataRDD foreach (println(_)) 

} 
} 

输出 - >

(Adam,(22,100)) 
(Bob,(25,102)) 
(Bob,(28,104)) 
(Chris,(29,110)) 
+0

这个代码在集群中不会工作,或者当它作为本地[*]分区数不会保持为1时。 –

+0

检查它,它将工作。 –

0

的Python:

sc.parallelize([['Chris',29,110],['Bob',28,104],['Bob',25,102],['Adam',22,100]]).groupBy(lambda x: x[0]).sortByKey().flatMap(lambda x: list(x[1])).collect() 

[[ '亚当',22,100],[ '鲍勃',25,102],[ '鲍勃',28,104],[ '克里斯',29,110]]

斯卡拉:

sc.parallelize(List(Array("Chris",29,110),Array("Bob",28,104),Array("Bob",25,102),Array("Adam",22,100))).groupBy(x => x(0).asInstanceOf[String]).sortByKey().flatMap(x=> x._2).collect() 

数组[数组[不限] =阵列(阵列(亚当,22,100),阵列(鲍勃,28,104),阵列(鲍勃,25,102),阵列(克里斯,29,110))

你可能想把其他列一个如果你想将它们包含在你的分类标准中,那么它就是你的密钥的一部分。所以在上面的例子中,第二列的Bob排序不会在那里。