2015-09-25 25 views
0

我试图模糊地连接两个数据集,其中一个引号和其中一个销售。出于参数的原因,加入属性是名字,姓氏,dob和电子邮件。平面图内的星图复制笛卡尔连接

我有26m +的报价和1m +的销售额。客户可能没有使用一个或多个属性的准确信息,所以我给他们每个匹配的分数(1,1,1,1),其中全部匹配(0,0,0,0),其中没有比赛。

所以我结束了类似

q1, s1, (0,0,1,0) 
q1, s2, (0,1,0,1) 
q1, s3, (1,1,1,1) 
q2, s1, (1,0,0,1) 
... 
q26000000 s1 (0,1,0,0) 

东西,所以我认为这是相当于我在我的管理使得大量分区的报价笛卡尔积

val quotesRaw = sc.textfile(....) 
val quotes = quotesRaw.repartition(quotesRaw.count().toInt()/100000) 

val sales = sc.textfile(...) 
val sb = sc.broadcast(sales.collect()) 

quotes.mapPartitions(p=> (
    p.flatMap(q => (
     sb.value.map(s => 
      q._1, s._1, (if q._2 == s._2 1 else 0, etc) 
    ) 
) 

这一切都有效,如果我保持数字低,如26米报价,但只有1000销售,但如果我运行它将所有销售它只是停止响应时运行

我'用下面的配置运行它。

spark-submit --conf spark.akka.frameSize=1024 --conf spark.executor.memory=3g --num-executors=30 --driver-memory 6g --class SalesMatch --deploy-mode client --master yarn SalesMatching-0.0.1-SNAPSHOT.jar hdfs://cluster:8020/data_import/Sales/SourceSales/2014/09/01/SourceSales_20140901.txt hdfs://cluster:8020/data_import/CDS/Enquiry/2014/01/01/EnquiryBackFill_20140101.txt hdfs://cluster:8020/tmp/_salesdata_matches_new 

有没有什么东西跳出来显然不正确?

+0

这要看情况。从RDD创建广播是一个昂贵的过程,尤其是当RDD很大时(单个销售记录的大小是多少?)。首先,您必须将所有数据传输到驱动程序,然后分发给工作人员。这意味着重复的序列化/反序列化,网络流量和存储数据的成本。 – zero323

+1

@ zero323一旦我已经预测出我感兴趣的属性,它对于11m的销售量最终只有40mb。在修剪为必需的属性后,引号结束大约3GB。鉴于3节点盒子上的这些尺寸,我不知道为什么它如此缓慢。 (即使接受q * s性质的结果 – owen79

+0

因此,广播不太可能是一个问题,另一个腥味是笛卡尔产品本身,每个分区10k报价和11M销售,每个分区产生大约4TB(11e11条目)if我正确的计算,我有理由在flatMap中过滤相似性很低的文件, – zero323

回答

1

假设每个分区有10万个报价,总大小为40MB的11M个销售,您的代码会为每个分区产生大约4TB的数据,所以您的员工不太可能处理这个问题,而且绝对不能在内存中完成。

我假设你只对近距离匹配感兴趣,所以过早过滤是有意义的。简化您的一些代码(据我可以告诉有没有理由使用mapPartitions):

// Check if match is close enough, where T is type of (q._1, s._1, (...)) 
def isCloseMatch(match: T): Boolean = ??? 

quotes.flatMap(q => sb.value 
    .map(s => (q._1, s._1, (....))) // Map as before 
    .filter(isCloseMatch) // yield only close matches 
) 

总论:

  • 从RDD创建广播是昂贵的过程。首先,您必须将所有数据传输到驱动程序,然后分发给工作人员。它意味着重复存储数据
  • 对于相对简单的操作,这样就可以使用高国家级星火SQL API是一个好主意的序列化/反序列化,网络流量和费用:

    import org.apache.spark.sql.DataFrame 
    
    val salesDF: DataFrame = ??? 
    val salesDF: DataFrame = ??? 
    val featureCols: Seq[String] = ??? 
    val threshold: Int = ??? 
    
    val inds = featureCols // Boolean columns 
        .map(col => (quotesDF(col) === salesDF(col)).alias(s"${col}_ind")) 
    
    val isSimilar = inds // sum(q == s) > threshold 
        .map(c => c.cast("integer").alias(c.toString)) 
        .reduce(_ + _) 
        .geq(threshold) 
    
    val combined = quotesDF 
        .join(salesDF, isSimilar, "left") 
    
+0

非常感谢,我会给它一个提示,在得分之前做一些过滤有什么好处吗?是否会减少销售数量如果已经被过滤了,比上面要检查的更多吗?比如'.filter(q.email == s.email).map(q._1,q._2 ......)' – owen79

+0

是的,虽然如果你有一些领域,你总是想用这个领域的Map来代替线性搜索。像'val sb = sc.broadcast(sales.map(s =>(s.email,s))。collectAsMap'。 – zero323