2016-08-24 111 views
2

我试图运行在蜂巢查询:阿帕奇星火 - 蜂巢内部联接,LIMIT和定制UDF

这里是最简单的设置(我知道我可以做一个=,但使用自定义UDF它做更多的IM不仅仅是一个相等比较)

数据集a和b是约30,000行,每行

SELECT * FROM a INNER JOIN b ON Custom_UDF_Equals_Comparison(a.id, b.id) LIMIT 5

其中custom_UDF_Equals_Comparison简单地做a.id = b.id

之间的相等性检查

当我运行这个查询时,我可以在我的日志输出中看到很多m/r任务正在运行,假设它在两个数据集之间进行比较,直到比较所有可能的排列,并且远高于5的极限(我会因为我知道大多数数据可以在每个表的前几行中加入,所以只需要少量m/r任务),为什么会发生这种情况?和/或我该如何解决?

编辑:

喜zero323,这是一个类似的问题,但不完全一样,它解释了为什么使用UDF进行比较时执行2之间RDD的一个全面的比较,但它不解释为什么限制不停止比较时发现5的限制。例如,如果在前10次加入尝试中找到5行,为什么还会进行剩余的30,000 * 30,000次尝试。是否由于在所有连接发生后都应用限制的事实?例如它加入30,000 * 30,000行,然后将它们减少到5?

EDIT2:

def levenshtein(str1: String, str2: String): Int = { 
val lenStr1 = str1.length 
val lenStr2 = str2.length 

val d: Array[Array[Int]] = Array.ofDim(lenStr1 + 1, lenStr2 + 1) 

for (i <- 0 to lenStr1) d(i)(0) = i 
for (j <- 0 to lenStr2) d(0)(j) = j 

for (i <- 1 to lenStr1; j <- 1 to lenStr2) { 
    val cost = if (str1(i - 1) == str2(j-1)) 0 else 1 

    d(i)(j) = min(
    d(i-1)(j ) + 1,  // deletion 
    d(i )(j-1) + 1,  // insertion 
    d(i-1)(j-1) + cost // substitution 
) 
} 

d(lenStr1)(lenStr2) 

}

def min(nums: Int*): Int = nums.min 

def join_views(joinType: String, parameters: Any, col1: Any, col2: Any) : Boolean = { 
if (joinType == "Equals") { 
    if (col1 == null || col2 == null) { 
    return false 
    } 

    return col1 == col2 
} 
else if (joinType == "Fuzzy_String") { 
    if (col1 == null || col2 == null) { 
    return false 
    } 

    val val1 = col1.asInstanceOf[String] 
    val val2 = col2.asInstanceOf[String] 

    val ratio = Utils.distancePercentage(val1, val2) 

    if (ratio == 1.0) { 
    return val1 == val2 
    } 

    return (ratio >= parameters.asInstanceOf[Double]) 
} 

return false; 

}

... ON join_views( “Fuzzy_String”, “0.1”,a.col1,b.col1)LIMIT 5 = 20secs

... ON join_views(“Fuzzy_String”,“0.9”,a.col1,b.col1)LIMIT 5 = 100secs

+0

能依然关闭,感谢您的帮助 –

+0

我居然发现了一些令人费解的,我custom_UDF也做了模糊检查,当我运行与0.1模糊值必须匹配,它非常快速的回报加入结果(例如,它匹配到5行非常快并返回),当我将它设置为0.9必须匹配它需要类似于UDF中的原始=比较。我想知道为什么0.1 DOES的模糊匹配回报更快?不一定是个问题,只是一个观察 –

+0

这实际上很有趣。你能分享一些实施细节吗? – zero323

回答

1

所以这里有三个不同的问题:

  • 星火优化利用散列和排序连接,以便这些优化仅适用于同等联接。其他类型的连接,包括取决于UDF的连接需要成对比较,因此需要笛卡尔积。详情请参阅Why using a UDF in a SQL query leads to cartesian product?
  • 数据移动后的限制操作,特别是混洗,无法完全优化。您可以在Sun Rui提供的nice answerTowards limiting the big RDD中找到一个很好的解释。

    由于缺乏随机播放,您的情况相当简单,但您仍然必须将分区放在一起。

  • 限制优化基于分区而不是记录。 Spark开始检查第一个分区,并且如果满足条件的元素数量少于所需的数量,它会迭代每次迭代所需的分区数量增加(据我记得的因子是4)。如果你正在寻找一个罕见的事件,这可以增加很快。