2017-10-12 40 views
0

我正在研究写入S3后会自动将表和分区注册到配置单元元存储的内容。从Spark数据集获取配置单元分区

在我可以注册所有分区之前,我需要知道所有的分区值。现在我正在做ds.select(partitionColumn).distinct().collectAsList();来获取所有的分区值。

有没有更好的方法从我的数据集中获取分区值?

+0

AWS胶水已经为你做这个。 –

+0

我不知道更好的解决方案,这也是我的做法 –

+0

@ThiagoBaldim我们看过AWS Glue,但它似乎并不允许我们将其用作外部产品的Metastore服务。像Tableau,Databricks等... –

回答

0

读取Spark源代码后,特别是AlterTableRecoverPartitionsCommand,org.apache.spark.sql.execution.command.ddl.scala,这是Spark实现的ALTER TABLE RECOVER PARTITIONS。它扫描所有分区,然后注册它们。

因此,这里是相同的想法,扫描我们刚写入的位置的所有分区。

从中获取密钥名称,然后从中提取分区名称/值。

以下是获取路径的代码片段。

String location = "s3n://somebucket/somefolder/dateid=20171010/"; 
Path root = new Path(location); 

Configuration hadoopConf = sparkSession.sessionState().newHadoopConf(); 
FileSystem fs = root.getFileSystem(hadoopConf); 

JobConf jobConf = new JobConf(hadoopConf, this.getClass()); 
final PathFilter pathFilter = FileInputFormat.getInputPathFilter(jobConf); 

FileStatus[] fileStatuses = fs.listStatus(root, path -> { 
    String name = path.getName(); 
    if (name != "_SUCCESS" && name != "_temporary" && !name.startsWith(".")) { 
     return pathFilter == null || pathFilter.accept(path); 
    } else { 
     return false; 
    } 
}); 

for(FileStatus fileStatus: fileStatuses) { 
    System.out.println(fileStatus.getPath().getName()); 
} 
+0

基于这种方法,我们可以扩展当前的过滤器来完成额外的工作。 折衷是我们得到的路径不正是我们写到,如果SaveMode不覆盖。在我的情况下,我现在只将它用于覆盖模式。 –

相关问题