2017-08-24 40 views
1

Apache Spark如何检测重复行?Apache Spark如何检测重复项?它可以被修改吗?

的原因,我问的是,我想有一个稍微不同的行为:

在设定的用于重复检测列,对于他们中的一些(这是double型)我想是重复检测基于两个值之间的差异低于某个阈值(由我指定)。

我想这可能会使用crossJoin()与适当的where声明后,但是,我希望有一个更优雅的解决方案?

谢谢!

回答

1

它使用HashArggregate

scala> df.distinct.explain 
== Physical Plan == 
*HashAggregate(keys=[x#12], functions=[]) 
+- Exchange hashpartitioning(x#12, 200) 
    +- *HashAggregate(keys=[x#12], functions=[]) 
     +- LocalTableScan [x#12] 

我希望一个更优雅的解决方案?

您可以尝试近似加入由LSH运营商提供:

但它是不可能与单一功能的工作。

您可以对窗口函数使用类似会话的方法,但只有在可以将数据划分为多个分区时才有用。如果你是罚款近似可以使用固定大小的范围,然后申请我所描述的方法在Spark - Window with recursion? - Conditionally propagating values across rows

sort随后与mapPartitions另一种近似可以实现。

df.sortBy("someColumn").rdd.mapPartitions(drop_duplicates).toDF() 

其中dropDuplicates可以实施类似于:

def drop_duplicates(xs): 
    prev = None 
    for x in xs: 
     if prev is None or abs(x - prev) > threshold: 
      yield x 
     prev = x 

随着一点点努力,你可以把它在分区边界一致为好。

+0

感谢您的好和有趣的指针。问题是:我试图得到一个*确切的*解决方案。对于一个近似的解决方案,我可以根据阈值进行乘/除/整,然后完成。顺便说一句:我有一个'groupID'列可以用于'Window.partitionBy('groupID')'。 –

相关问题