2017-04-22 58 views
3

如何使用Spark-Scala中的takeOrdered(4)(Ordering[Int])方法对2列或多列进行排序。如何使用takeOrdered对多个列进行排序?

此使用sortBy这样我可以实现:

lines.sortBy(x => (x.split(",")(1).toInt, -x.split(",")(4).toInt)).map(p => println(p)).take(50) 

但是当我尝试使用takeOrdered接近其未能

+0

'线条'的类型是什么? –

回答

0

排序您可以实现自定义排序:

lines.takeOrdered(4)(new Ordering[String] { 
    override def compare(x: String, y: String): Int = { 
    val xs=x.split(",") 
    val ys=y.split(",") 
    val d1 = xs(1).toInt - ys(1).toInt 
    if (d1 != 0) d1 else ys(4).toInt - xs(4).toInt 
    } 
}) 
1

tl; dr做一些这样的事情(但是考虑重写你的代码来调用split一次):

lines.map(x => (x.split(",")(1).toInt, -x.split(",")(4).toInt)).takeOrdered(50) 

这里是解释。

当你调用takeOrdered直接lines,生效的implicit OrderingOrdering[String]因为linesRDD[String]。您需要将lines转换为新的RDD[(Int, Int)]。因为有implicit Ordering[(Int, Int)]可用,它会在您的转换RDD上生效。

同时,sortBy的工作原理有所不同。下面是签名:

sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] 

我知道这是一个令人生畏的签名,但如果你通过削减噪音,你可以看到sortBy需要您的原始类型映射到一个新的类型只是排序目的的功能,如果其中一个在implicit范围内,则适用于该返回类型的Ordering

在你的情况,你在你的RDD应用功能的String s到它们转变成如何星火应该仅仅把它们用于排序目的“视图”,(Int, Int),然后依靠implicit Ordering[(Int, Int)]如上所述可用。

sortBy方法可以让你保持lines完整为RDD[String]和使用映射只是进行排序,而takeOrdered方法上全新RDD,包含从原始lines衍生(Int, Int)工作。哪种方法更适合您的需求取决于您希望完成什么。

在另一个说明中,您可能想要将您的代码重写为只有split您的文本一次。