我正在研究写入S3后会自动将表和分区注册到配置单元元存储的内容。从Spark数据集获取配置单元分区
在我可以注册所有分区之前,我需要知道所有的分区值。现在我正在做ds.select(partitionColumn).distinct().collectAsList();
来获取所有的分区值。
有没有更好的方法从我的数据集中获取分区值?
我正在研究写入S3后会自动将表和分区注册到配置单元元存储的内容。从Spark数据集获取配置单元分区
在我可以注册所有分区之前,我需要知道所有的分区值。现在我正在做ds.select(partitionColumn).distinct().collectAsList();
来获取所有的分区值。
有没有更好的方法从我的数据集中获取分区值?
读取Spark源代码后,特别是AlterTableRecoverPartitionsCommand
,org.apache.spark.sql.execution.command.ddl.scala
,这是Spark实现的ALTER TABLE RECOVER PARTITIONS
。它扫描所有分区,然后注册它们。
因此,这里是相同的想法,扫描我们刚写入的位置的所有分区。
从中获取密钥名称,然后从中提取分区名称/值。
以下是获取路径的代码片段。
String location = "s3n://somebucket/somefolder/dateid=20171010/";
Path root = new Path(location);
Configuration hadoopConf = sparkSession.sessionState().newHadoopConf();
FileSystem fs = root.getFileSystem(hadoopConf);
JobConf jobConf = new JobConf(hadoopConf, this.getClass());
final PathFilter pathFilter = FileInputFormat.getInputPathFilter(jobConf);
FileStatus[] fileStatuses = fs.listStatus(root, path -> {
String name = path.getName();
if (name != "_SUCCESS" && name != "_temporary" && !name.startsWith(".")) {
return pathFilter == null || pathFilter.accept(path);
} else {
return false;
}
});
for(FileStatus fileStatus: fileStatuses) {
System.out.println(fileStatus.getPath().getName());
}
基于这种方法,我们可以扩展当前的过滤器来完成额外的工作。 折衷是我们得到的路径不正是我们写到,如果SaveMode不覆盖。在我的情况下,我现在只将它用于覆盖模式。 –
AWS胶水已经为你做这个。 –
我不知道更好的解决方案,这也是我的做法 –
@ThiagoBaldim我们看过AWS Glue,但它似乎并不允许我们将其用作外部产品的Metastore服务。像Tableau,Databricks等... –