2015-06-15 120 views
31

尝试使用火花壳阅读位于S3文件:星火使用sc.textFile从S3读取文件(“S3N:// ...)

scala> val myRdd = sc.textFile("s3n://myBucket/myFile1.log") 
lyrics: org.apache.spark.rdd.RDD[String] = s3n://myBucket/myFile1.log MappedRDD[55] at textFile at <console>:12 

scala> myRdd.count 
java.io.IOException: No FileSystem for scheme: s3n 
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2607) 
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2614) 
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) 
    ... etc ... 

IOException异常:没有文件系统的方案:S3N有发生错误:

  • 星火1.31或1.40的开发机器上(没有Hadoop的LIBS)
  • Hortonworks Sandbox HDP v2.2.4(Hadoop的2.60),它集成星火1.2.1开箱运行
  • 使用s3://或s3n://方案

这个错误的原因是什么?缺少依赖性,缺少配置或者错误使用sc.textFile()

或者这可能是由于一个影响Hadoop 2.60特定的Spark构建的错误,因为这个post似乎暗示。我将尝试使用Spark for Hadoop 2.40来查看是否解决了这个问题。

回答

33

确认这是有关对Hadoop的2.60星火构建。刚刚安装了Spark 1.4.0 "Pre built for Hadoop 2.4 and later"(而不是Hadoop 2.6)。代码现在可以正常工作。

sc.textFile("s3n://bucketname/Filename")现在提出了另一个错误:

java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively). 

下面的代码使用S3 URL格式显示,星火可以读取S3文件。使用开发机器(无Hadoop库)。

scala> val lyrics = sc.textFile("s3n://MyAccessKeyID:[email protected]/SafeAndSound_Lyrics.txt") 
lyrics: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at <console>:21 

scala> lyrics.count 
res1: Long = 9 

甚至更​​好:与AWS凭据内嵌在S3N URI上面的代码将打破,如果AWS秘密密钥具有前“/”。在SparkContext中配置AWS Credentials可以解决这个问题。无论S3文件是公共还是私人,代码都可以工作。

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "BLABLA") 
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", "....") // can contain "/" 
val myRDD = sc.textFile("s3n://myBucket/MyFilePattern") 
myRDD.count 
+1

Spark 1.6.0与Hadoop 2.4为我工作。 Hadoop 2.6的Spark 1.6.0没有。 –

+1

@PriyankDesai对于有相同问题的其他人,请参阅https://issues.apache.org/jira/browse/SPARK-7442以及评论部分中的链接。 – timss

+0

请参阅下面的答案,解释为什么它不适用于Hadoop 2.6版本。 –

8

这是一个样的火花代码可以读取出现在S3

val hadoopConf = sparkContext.hadoopConfiguration 
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") 
hadoopConf.set("fs.s3.awsAccessKeyId", s3Key) 
hadoopConf.set("fs.s3.awsSecretAccessKey", s3Secret) 
var jobInput = sparkContext.textFile("s3://" + s3_location) 
1

文件你可能不得不使用S3A:/方案,而不是S3的:/或S3N:/ 然而,对于火星壳而言,它并不适合我(箱)。我看到下面的堆栈跟踪:

java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found 
     at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074) 
     at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2578) 
     at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591) 
     at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) 
     at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630) 
     at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612) 
     at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370) 
     at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) 
     at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256) 
     at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) 
     at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313) 
     at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:207) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) 
     at scala.Option.getOrElse(Option.scala:120) 
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) 
     at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) 
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) 
     at scala.Option.getOrElse(Option.scala:120) 
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) 
     at org.apache.spark.SparkContext.runJob(SparkContext.scala:1781) 
     at org.apache.spark.rdd.RDD.count(RDD.scala:1099) 
     at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:24) 
     at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:29) 
     at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:31) 
     at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:33) 
     at $iwC$$iwC$$iwC$$iwC.<init>(<console>:35) 
     at $iwC$$iwC$$iwC.<init>(<console>:37) 
     at $iwC$$iwC.<init>(<console>:39) 
     at $iwC.<init>(<console>:41) 
     at <init>(<console>:43) 
     at .<init>(<console>:47) 
     at .<clinit>(<console>) 
     at .<init>(<console>:7) 
     at .<clinit>(<console>) 
     at $print(<console>) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:497) 
     at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) 
     at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) 
     at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) 
     at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) 
     at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) 
     at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) 
     at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) 
     at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) 
     at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) 
     at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) 
     at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) 
     at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) 
     at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 
     at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 
     at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
     at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) 
     at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) 
     at org.apache.spark.repl.Main$.main(Main.scala:31) 
     at org.apache.spark.repl.Main.main(Main.scala) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:497) 
     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665) 
     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170) 
     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193) 
     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) 
     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found 
     at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1980) 
     at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2072) 
     ... 68 more 

我想什么 - 你必须手动添加Hadoop的AWS依赖手动http://search.maven.org/#artifactdetails|org.apache.hadoop|hadoop-aws|2.7.1|jar但我不知道如何将它添加到适当的火花外壳。

+1

使用逗号分隔的'--jars'参数将jar的路径添加到spark-shell。您还需要添加'aws-java-sdk - * - jar'。 –

2

火花1.4.x的 “预建Hadoop的2.6及更高版本”:

我只是复制所需的S3,从Hadoop的AWS-2.6.0.jar S3native包 火花组装1.4.1 -hadoop2.6.0.jar。

之后,我重新启动火花集群,它的工作原理。 不要忘记检查装配罐的所有者和模式。 对您所提交:

14

您可以用适当的jar添加的--packages参数

bin/spark-submit --packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.6.0 code.py 
+0

看起来很有希望,但是当我使用'spark-submit -packages com。'进行'file:/ home/jcomeau/.m2/repository/asm/asm/3.2/asm-3.2.jar'时, amazonaws:aws-java-sdk-pom:1.11.83,org.apache.hadoop:hadoop-aws:2.7.3 merge.py'。有任何想法吗? –

23

尽管这个问题已经被接受的答案,我认为,为什么这个具体细节正在发生仍然失踪。所以我认为可能有一个地方可以提供更多的答案。

如果添加所需的hadoop-aws依赖项,您的代码应该可以工作。

启动Hadoop 2.6.0,s3 FS连接器已被移动到名为hadoop-aws的单独库中。 还有一个吉拉: Move s3-related FS connector code to hadoop-aws

这意味着,针对Hadoop 2.6.0或更高版本构建的任何Spark版本都必须使用其他外部依赖关系才能连接到S3文件系统。
下面是一个SBT的例子,我曾尝试,并正在为使用Apache 1.6.2星火对内置的Hadoop 2.6.0预期:

libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "2.6.0"

在我的情况,我遇到了一些问题的依赖性,所以我被解决加入排除:

libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "2.6.0" exclude("tomcat", "jasper-compiler") excludeAll ExclusionRule(organization = "javax.servlet")

在其他相关的说明,我还没有尝试,但它建议使用“S3A”,而不是“S3N”文件系统开始的Hadoop 2.6.0。

The third generation, s3a: filesystem. Designed to be a switch in replacement for s3n:, this filesystem binding supports larger files and promises higher performance.

2

有一个Spark JIRA,SPARK-7481,今天的开放,2016年10月20日,加一个火花云模块,它包括一切S3A和蔚蓝wasb传递依赖:需要与测试一起。

Spark PR匹配。这就是我如何在我的火花构建中获得S3a支持的方法

如果您手动完成,您必须获得hadoop-aws JAR的准确版本,其余的hadoop JARS以及一个版本的AWS JAR 100%与Hadoop aws编译的目标同步。 Hadoop的2.7。{1,2,3,...}

hadoop-aws-2.7.x.jar 
aws-java-sdk-1.7.4.jar 
joda-time-2.9.3.jar 
+ jackson-*-2.6.5.jar 

棒所有这些成SPARK_HOME/jars。使用您的凭据运行火花设立在信封瓦尔或spark-default.conf

最简单的测试,你可以做一个CSV的行数文件

val landsatCSV = "s3a://landsat-pds/scene_list.gz" 
val lines = sc.textFile(landsatCSV) 
val lineCount = lines.count() 

获取一个数字:一切都很好。获取堆栈跟踪。坏消息。

4

在Spark 2.0.2中遇到同样的问题。通过喂它的罐子来解决它。这是我跑:

$ spark-shell --jars aws-java-sdk-1.7.4.jar,hadoop-aws-2.7.3.jar,jackson-annotations-2.7.0.jar,jackson-core-2.7.0.jar,jackson-databind-2.7.0.jar,joda-time-2.9.6.jar 

scala> val hadoopConf = sc.hadoopConfiguration 
scala> hadoopConf.set("fs.s3.impl","org.apache.hadoop.fs.s3native.NativeS3FileSystem") 
scala> hadoopConf.set("fs.s3.awsAccessKeyId",awsAccessKeyId) 
scala> hadoopConf.set("fs.s3.awsSecretAccessKey", awsSecretAccessKey) 
scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
scala> sqlContext.read.parquet("s3://your-s3-bucket/") 

很明显,你需要有你的出发

+0

我也遇到了Spark 2.1.0的这个问题,并将“spark-defaults.conf”的最新aws要求(spark.jars.packages org.apache.hadoop:hadoop-aws:2.7.3) 添加到“spark-defaults.conf”招。 python中的 – Kieleth

0

使用S3A,而不是S3N运行火花壳路径罐子。我在Hadoop工作上遇到了类似的问题。从s3n切换到s3a后,它工作。

例如

s3a://myBucket/myFile1.log

0

我正面临同样的问题。在设置fs.s3n.impl的值并添加hadoop-aws依赖关系后,它工作正常。

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", awsAccessKeyId) 
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey", awsSecretAccessKey) 
sc.hadoopConfiguration.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") 
+0

:AttributeError:'SparkContext'对象没有属性'hadoopConfiguration' –

相关问题