如何使用Spark-Scala中的takeOrdered(4)(Ordering[Int])
方法对2列或多列进行排序。如何使用takeOrdered对多个列进行排序?
此使用sortBy这样我可以实现:
lines.sortBy(x => (x.split(",")(1).toInt, -x.split(",")(4).toInt)).map(p => println(p)).take(50)
但是当我尝试使用takeOrdered接近其未能
如何使用Spark-Scala中的takeOrdered(4)(Ordering[Int])
方法对2列或多列进行排序。如何使用takeOrdered对多个列进行排序?
此使用sortBy这样我可以实现:
lines.sortBy(x => (x.split(",")(1).toInt, -x.split(",")(4).toInt)).map(p => println(p)).take(50)
但是当我尝试使用takeOrdered接近其未能
排序您可以实现自定义排序:
lines.takeOrdered(4)(new Ordering[String] {
override def compare(x: String, y: String): Int = {
val xs=x.split(",")
val ys=y.split(",")
val d1 = xs(1).toInt - ys(1).toInt
if (d1 != 0) d1 else ys(4).toInt - xs(4).toInt
}
})
tl; dr做一些这样的事情(但是考虑重写你的代码来调用split
一次):
lines.map(x => (x.split(",")(1).toInt, -x.split(",")(4).toInt)).takeOrdered(50)
这里是解释。
当你调用takeOrdered
直接lines
,生效的implicit Ordering
是Ordering[String]
因为lines
是RDD[String]
。您需要将lines
转换为新的RDD[(Int, Int)]
。因为有implicit Ordering[(Int, Int)]
可用,它会在您的转换RDD
上生效。
同时,sortBy
的工作原理有所不同。下面是签名:
sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
我知道这是一个令人生畏的签名,但如果你通过削减噪音,你可以看到sortBy
需要您的原始类型映射到一个新的类型只是排序目的的功能,如果其中一个在implicit
范围内,则适用于该返回类型的Ordering
。
在你的情况,你在你的RDD
应用功能的String
s到它们转变成如何星火应该仅仅把它们用于排序目的“视图”,即为(Int, Int)
,然后依靠implicit Ordering[(Int, Int)]
如上所述可用。
的sortBy
方法可以让你保持lines
完整为RDD[String]
和使用映射只是进行排序,而takeOrdered
方法上全新RDD
,包含从原始lines
衍生(Int, Int)
工作。哪种方法更适合您的需求取决于您希望完成什么。
在另一个说明中,您可能想要将您的代码重写为只有split
您的文本一次。
'线条'的类型是什么? –