我试图模糊地连接两个数据集,其中一个引号和其中一个销售。出于参数的原因,加入属性是名字,姓氏,dob和电子邮件。平面图内的星图复制笛卡尔连接
我有26m +的报价和1m +的销售额。客户可能没有使用一个或多个属性的准确信息,所以我给他们每个匹配的分数(1,1,1,1),其中全部匹配(0,0,0,0),其中没有比赛。
所以我结束了类似
q1, s1, (0,0,1,0)
q1, s2, (0,1,0,1)
q1, s3, (1,1,1,1)
q2, s1, (1,0,0,1)
...
q26000000 s1 (0,1,0,0)
东西,所以我认为这是相当于我在我的管理使得大量分区的报价笛卡尔积
val quotesRaw = sc.textfile(....)
val quotes = quotesRaw.repartition(quotesRaw.count().toInt()/100000)
val sales = sc.textfile(...)
val sb = sc.broadcast(sales.collect())
quotes.mapPartitions(p=> (
p.flatMap(q => (
sb.value.map(s =>
q._1, s._1, (if q._2 == s._2 1 else 0, etc)
)
)
这一切都有效,如果我保持数字低,如26米报价,但只有1000销售,但如果我运行它将所有销售它只是停止响应时运行
我'用下面的配置运行它。
spark-submit --conf spark.akka.frameSize=1024 --conf spark.executor.memory=3g --num-executors=30 --driver-memory 6g --class SalesMatch --deploy-mode client --master yarn SalesMatching-0.0.1-SNAPSHOT.jar hdfs://cluster:8020/data_import/Sales/SourceSales/2014/09/01/SourceSales_20140901.txt hdfs://cluster:8020/data_import/CDS/Enquiry/2014/01/01/EnquiryBackFill_20140101.txt hdfs://cluster:8020/tmp/_salesdata_matches_new
有没有什么东西跳出来显然不正确?
这要看情况。从RDD创建广播是一个昂贵的过程,尤其是当RDD很大时(单个销售记录的大小是多少?)。首先,您必须将所有数据传输到驱动程序,然后分发给工作人员。这意味着重复的序列化/反序列化,网络流量和存储数据的成本。 – zero323
@ zero323一旦我已经预测出我感兴趣的属性,它对于11m的销售量最终只有40mb。在修剪为必需的属性后,引号结束大约3GB。鉴于3节点盒子上的这些尺寸,我不知道为什么它如此缓慢。 (即使接受q * s性质的结果 – owen79
因此,广播不太可能是一个问题,另一个腥味是笛卡尔产品本身,每个分区10k报价和11M销售,每个分区产生大约4TB(11e11条目)if我正确的计算,我有理由在flatMap中过滤相似性很低的文件, – zero323