2017-03-24 76 views
0

我想弄清楚如何消除交叉连接中给出两个数据集的重复。火花交叉连接消除重复

例如客户端(clientDs)

clientDs = sc.parallelize(List('c1', 'c2', 'c3')) 

clientMatrixDs = clientDs.join(clientDs) 

clientProduct.show() 

-- output 

c1, c1 
c1, c2 
c1, c3 
c2, c1 
c2, c2 
c2, c3 
c3, c1 
c3, c2 
c3, c3 

在这种情况下, 两个(C1,C2)和(C2,C1)是重复的,我需要重复数据删除它

I”米不知道如何做到这一点

在这里寻找一些想法。

+0

如何获得'(p1,c1)'作为此连接的结果?客户和产品是否属于同一类型?通常情况下,如果连接不在初始集合中,那么重复就不会出现在连接中。 –

+0

@CyrilleCorpet是的,我刚刚更新了这个例子。谢谢! –

回答

2

在你的榜样,你有一大堆的Tuple2[String, String] s,而问题是,你想要的Tuple2只是没有在那里(a, b) == (b, a)的平等。这就是为什么distinct不适合你。因此您必须提供您自己的自定义平等。

是你不希望覆盖Tuple2的版本equals,因为这可能是危险的,所以你可以提供一个自定义的东西等于地方:

def customEquals(tuple1: (String, String), tuple2: (String, String)) = { 
    tuple1 == tuple2 || (tuple1._1 == tuple2._2 && tuple1._2 == tuple2._1) 
} 

然后你可以在使用此功能filter摆脱您的自定义副本:

val deduped = clientMatrixDs.filter { 
    var seq = Seq.empty[(String, String)] 
    tuple => 
    if (seq.exists(customEquals(tuple, _))) { 
     false 
    } else { 
     seq :+= tuple 
     true 
    } 
} 
0

RDD s有一个方法distinct(),它应该作为List上的等效工作。然而,我不知道它的表演。

编辑

然而,这并不会因为在斯卡拉(a, b) != (b, a)工作。所以你必须交换一些你的元素,以确保你没有得到任何双打。

如果您对您的类型有一些订购,您可以将所有配对映射到它们的订购等价物。例如,地图(2, 1)(1, 2)(3, 4)(3, 4)。然后你可以使用distinct,这将删除所有重复。

val distinctPairsRDD: RDD[(T, T)] = rdd.map{ 
    case (a, b) if a <= b => (a, b) 
    case (a, b) => (b, a) 
}.distinct() 

如果你没有这样的排序,你可以通过一些Set s,这是无序代替你对。所以,你可以按照如下映射您RDD

val distinctRDD: RDD[Set[T]] = rdd.map { 
    case (a, b) => Set(a, b) 
}.distinct() 

然而,这将失去你有型,所以你可能需要回去后对。要做到这一点,请记住Set本身没有重复项,所以如果您有一个具有相同元素两次的对,它将被映射到只有一个元素的集合。

所以你必须做到以下几点:

val distinctPairs: RDD[(T, T)] = distinctRdd.collect { 
    case Set(a) => (a, a) 
    case Set(a, b) => (a, b) 
} 

collect可以由map所取代,但它可能会诱发一些MatchError如果你以后更改您的代码。这会使所有其他情况(如果您恰好有一个空集或具有多于两个元素的集)被丢弃,因此请确保您对未来更改(而非RuntimeError或丢弃的元素)更喜欢什么。

TL; DR

尝试订购的元素在你的对,拿到唯一性。如果这不起作用,使用Set s,但它会更复杂。

+0

对不起,这不起作用......我已经试过了。 –