2015-07-10 62 views
3

我有一个非常简单的SparkSQL连接到Postgres数据库的设置,我试图从表中获取DataFrame,其数据分区数为X(可以说是2)。代码如下:SparkSQL PostgresQL Dataframe分区

Map<String, String> options = new HashMap<String, String>(); 
options.put("url", DB_URL); 
options.put("driver", POSTGRES_DRIVER); 
options.put("dbtable", "select ID, OTHER from TABLE limit 1000"); 
options.put("partitionColumn", "ID"); 
options.put("lowerBound", "100"); 
options.put("upperBound", "500"); 
options.put("numPartitions","2"); 
DataFrame housingDataFrame = sqlContext.read().format("jdbc").options(options).load(); 

由于某种原因,DataFrame的一个分区几乎包含所有行。

对于我所能理解的lowerBound/upperBound是用来微调的参数。在SparkSQL的文档(Spark 1.4.0 - spark-sql_2.11)中,它表示它们用于定义跨度,而不是过滤/范围分区列。但是这会产生以下几个问题:

  1. 步幅是每个执行器(分区)用于查询数据库的频率(每个查询返回的元素数)?
  2. 如果没有,这个参数的目的是什么,它们依赖于什么以及如何平衡我的DataFrame分区(不要求所有分区包含相同数量的元素,只是存在均衡 - 例如2个分区100个元素55/45,60/40或甚至65/35将会这样做)

似乎无法找到这些问题的明确答案,并想知道是否有些人可以清除这对我来说意义重大,因为在处理X百万行时,现在影响了我的集群性能,而所有繁重的工作都由一位执行者负责。

干杯和感谢您的时间。

回答

5

实际上,分区的下限和上限以及分区的数量用于计算每个并行任务的增量或拆分。

假设表有分区列“年”,并有从2006年到2016年

数据如果定义分区数为10,与下界2006年和上界到2016年,你将有各任务获取本年度数据 - 理想情况。

即使您错误地指定了下限和/或上限,例如,设置lower = 0和upper = 2016时,数据传输将会出现偏差,但是,您将不会“丢失”或检索不到任何数据,因为:

第一项任务将获取< 0年的数据。

第二项任务将获取0到2016/10之间的数据。

第三项任务将在2016/10和2 * 2016/10之间提取年度数据。

...

而最后的任务将与年 - > 2016年where条件。

T.

0

下界和上界已经被目前确定做他们在以前的答案做什么。对此的后续处理是如何在不考虑最小最大值或者数据严重偏斜的情况下跨分区平衡数据。

如果你的数据库支持“散列”功能,它可以做到这一点。

partitionColumn = “散列(列)%num_partitions”

numPartitions = 10 //无论你想

下界= 0

UPPERBOUND = numPartitions

只要这将工作模数运算返回[0,numPartitions)上的均匀分布