2017-07-19 56 views
0

我正尝试阅读scala中Paths的Sequence文件。下面是样本(伪)代码:Spark:只有在路径存在的情况下才能读取文件

val paths = Seq[String] //Seq of paths 
val dataframe = spark.read.parquet(paths: _*) 

现在,在上面的序列中,存在一些路径,而有些则不存在。在阅读parquet文件(避免org.apache.spark.sql.AnalysisException: Path does not exist)时,有什么方法可以忽略丢失的路径吗?

我曾尝试以下,似乎是工作,但后来,我结束了读同一两次路径这是我想避免这样做:

val filteredPaths = paths.filter(p => Try(spark.read.parquet(p)).isSuccess) 

我检查了options方法DataFrameReader但似乎没有任何选项类似于ignore_if_missing

而且,这些路径可以是hdfss3(这Seq被作为方法参数传递)和在阅读,我不知道一个路径是否是s3hdfs所以不能用s3hdfs特定API来检查存在。

回答

1

如何过滤paths firstly`:

paths.filter(f => new java.io.File(f).exists) 

例如:

Seq("/tmp", "xx").filter(f => new java.io.File(f).exists) 
// res18: List[String] = List(/tmp) 
+0

“路径”可以是本地的'hdfs'路径或's3'路径。不确定'File.exists'是否适用于's3'。 –

+1

如果路径是HDFS/S3路径(通常与Spark一起使用),那么需要稍微不同的API来检查路径存在。 [@DarshanMehta你击败了我3秒:)] –

+0

@TzachZohar哈哈是的。我现在已经更新了这个问题。 –

4

您可以过滤掉不相关的文件,如@ Psidom的答案。在火花中,最好的方法是使用内部火花hadoop配置。鉴于火花会话变量被称为“火花”,你可以这样做:

import org.apache.hadoop.fs.FileSystem 
import org.apache.hadoop.fs.Path 

val hadoopfs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration) 

def testDirExist(path: String): Boolean = { 
    val p = new Path(path) 
    hadoopfs.exists(p) && hadoopfs.getFileStatus(p).isDirectory 
} 
val filteredPaths = paths.filter(p => testDirExists(p)) 
val dataframe = spark.read.parquet(filteredPaths: _*) 
+0

根据您的系统设置,您可能需要在get:FileSystem.get(new URI(“s3:// bucket”),spark.sparkContext.hadoopConfiguration)中指定文件系统位置。否则,它可能会创建一个HDFS文件系统和barf来检查S3文件系统的路径。 – Azuaron

0

也许这样的事情可以为你工作?

def read(path: Seq[String]): Try[DataFrame] = Try(spark.read.parquet(p)) 


read("somePath") match { 
    case Success(df) => df.show() 
    case Failure(_) => Unit 
} 
相关问题