2016-12-06 47 views
0

我在Google Dataproc上运行Spark群集,并且在尝试使用sparkContext.textFile(...)从FTP读取GZipped文件时遇到一些问题。无法通过FTP使用Google Dataproc上的SparkContext.textFile(...)读取文件

我运行的代码是:

object SparkFtpTest extends App { 
    val file = "ftp://username:[email protected]:21/filename.txt.gz" 
    val lines = sc.textFile(file) 
    lines.saveAsTextFile("gs://my-bucket-storage/tmp123") 
} 

,我得到的错误是:

Exception in thread "main" org.apache.commons.net.ftp.FTPConnectionClosedException: Connection closed without indication. 

我看到一些人建议的凭据是错误的,所以我试着进入错误的凭据和错误是不同的,即无效的登录凭据。

如果我将该URL复制到浏览器中 - 该文件正在被正确下载,它也可以使用。

另外值得一提的是,我已经尝试过使用Apache commons-net库(与Spark-2.2中的版本相同)并且工作正常 - 我能够将数据(来自Master和Worker节点) 。我无法解压缩它(通过使用Java的GZipInputStream;我不记得失败,但如果你认为这很重要,我可以尝试重现它)。我认为这表明这不是群集上的某个防火墙问题,尽管我无法使用curl下载该文件。

我想我几个月前从本地机器运行相同的代码,如果我没记错的话,它工作得很好。

你有什么想法是什么导致这个问题? 难道这是某种依赖冲突问题,如果是这样的话?

我在项目中有几个依赖关系,例如google-sdk,solrj ......但是,如果是依赖关系问题,我希望看到类似ClassNotFoundExceptionNoSuchMethodError的东西。

整个堆栈跟踪看起来是这样的:

16/12/05 23:53:46 INFO com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Populating missing itemInfo on-demand for entry: gs://my-bucket-storage/tmp123/_temporary/ 
16/12/05 23:53:47 WARN com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Possible stale CacheEntry; failed to fetch item info for: gs://my-bucket-storage/tmp123/_temporary/ - removing from cache 
16/12/05 23:53:49 INFO com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Populating missing itemInfo on-demand for entry: gs://my-bucket-storage/tmp123/_temporary/0/ 
16/12/05 23:53:50 WARN com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Possible stale CacheEntry; failed to fetch item info for: gs://my-bucket-storage/tmp123/_temporary/0/ - removing from cache 
16/12/05 23:53:50 INFO com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Populating missing itemInfo on-demand for entry: gs://my-bucket-storage/tmp123/_temporary/ 
16/12/05 23:53:51 WARN com.google.cloud.hadoop.gcsio.CacheSupplementedGoogleCloudStorage: Possible stale CacheEntry; failed to fetch item info for: gs://my-bucket-storage/tmp123/_temporary/ - removing from cache 
Exception in thread "main" org.apache.commons.net.ftp.FTPConnectionClosedException: Connection closed without indication. 
    at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:298) 
    at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:495) 
    at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:537) 
    at org.apache.commons.net.ftp.FTP.sendCommand(FTP.java:586) 
    at org.apache.commons.net.ftp.FTP.quit(FTP.java:794) 
    at org.apache.commons.net.ftp.FTPClient.logout(FTPClient.java:788) 
    at org.apache.hadoop.fs.ftp.FTPFileSystem.disconnect(FTPFileSystem.java:151) 
    at org.apache.hadoop.fs.ftp.FTPFileSystem.getFileStatus(FTPFileSystem.java:395) 
    at org.apache.hadoop.fs.FileSystem.globStatusInternal(FileSystem.java:1701) 
    at org.apache.hadoop.fs.FileSystem.globStatus(FileSystem.java:1647) 
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:222) 
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:270) 
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:199) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246) 
    at scala.Option.getOrElse(Option.scala:121) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:246) 
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246) 
    at scala.Option.getOrElse(Option.scala:121) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:246) 
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:248) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:246) 
    at scala.Option.getOrElse(Option.scala:121) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:246) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1906) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1219) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1161) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1161) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1161) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1064) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1030) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1030) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1030) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:956) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:956) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:956) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:955) 
    at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1459) 
    at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1438) 
    at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1438) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) 
    at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1438) 
+0

你正在使用什么Dataproc图像版本? –

+0

这是我用来创建集群的脚本,所以我期望成为最新的(1.1): 'gcl​​oud数据集群创建$ cluster_name --zone $ zone --num-workers $ num_workers --initialization-actions $ init_actions - master-machine-type $ master_machine_type --worker-machine-type $ worker_machine_type --scopes datastore' –

回答

1

它看起来像这可能是星火/ Hadoop的一个已知的未解决的问题:https://issues.apache.org/jira/browse/HADOOP-11886https://github.com/databricks/learning-spark/issues/21都暗示了类似的堆栈跟踪。

如果你能够手动使用Apache Commons网库,你可以通过获取文件的列表,并行文件,这些文件列表作为RDD,并使用flatMap每个任务需要达到与sc.textFile同样的效果一个文件名并逐行读取文件,为每个文件生成输出行集合。另外,如果FTP中的数据量很小(最多可能为10 GB),那么与单个线程从FTP服务器复制到HDFS或GCS相比,并行读取不会有太大帮助在您的Dataproc集群中,然后在Spark作业中使用HDFS或GCS路径进行处理。

+0

谢谢Dennis。这真的让我感到困扰,但现在有意义。我通过获取文件列表来实施手动解决方案。但是,由于所有文件都很小,因此我喜欢您提供更多的替代解决方案。 –