1
Apache Spark如何检测重复行?Apache Spark如何检测重复项?它可以被修改吗?
的原因,我问的是,我想有一个稍微不同的行为:
在设定的用于重复检测列,对于他们中的一些(这是double
型)我想是重复检测基于两个值之间的差异低于某个阈值(由我指定)。
我想这可能会使用crossJoin()
与适当的where
声明后,但是,我希望有一个更优雅的解决方案?
谢谢!
Apache Spark如何检测重复行?Apache Spark如何检测重复项?它可以被修改吗?
的原因,我问的是,我想有一个稍微不同的行为:
在设定的用于重复检测列,对于他们中的一些(这是double
型)我想是重复检测基于两个值之间的差异低于某个阈值(由我指定)。
我想这可能会使用crossJoin()
与适当的where
声明后,但是,我希望有一个更优雅的解决方案?
谢谢!
它使用HashArggregate
:
scala> df.distinct.explain
== Physical Plan ==
*HashAggregate(keys=[x#12], functions=[])
+- Exchange hashpartitioning(x#12, 200)
+- *HashAggregate(keys=[x#12], functions=[])
+- LocalTableScan [x#12]
我希望一个更优雅的解决方案?
您可以尝试近似加入由LSH运营商提供:
但它是不可能与单一功能的工作。
您可以对窗口函数使用类似会话的方法,但只有在可以将数据划分为多个分区时才有用。如果你是罚款近似可以使用固定大小的范围,然后申请我所描述的方法在Spark - Window with recursion? - Conditionally propagating values across rows
与sort
随后与mapPartitions
另一种近似可以实现。
df.sortBy("someColumn").rdd.mapPartitions(drop_duplicates).toDF()
其中dropDuplicates
可以实施类似于:
def drop_duplicates(xs):
prev = None
for x in xs:
if prev is None or abs(x - prev) > threshold:
yield x
prev = x
随着一点点努力,你可以把它在分区边界一致为好。
感谢您的好和有趣的指针。问题是:我试图得到一个*确切的*解决方案。对于一个近似的解决方案,我可以根据阈值进行乘/除/整,然后完成。顺便说一句:我有一个'groupID'列可以用于'Window.partitionBy('groupID')'。 –