1

我使用以下Scala代码在S3中使用AWS EMR上的Apache Spark创建文本文件。使用EMR上的Spark写入S3中的文件

def createS3OutputFile() { 
    val conf = new SparkConf().setAppName("Spark Pi") 
    val spark = new SparkContext(conf) 
    // use s3n ! 
    val outputFileUri = s"s3n://$s3Bucket/emr-output/test-3.txt" 
    val arr = Array("hello", "World", "!") 
    val rdd = spark.parallelize(arr) 
    rdd.saveAsTextFile(outputFileUri) 
    spark.stop() 
    } 

def main(args: Array[String]): Unit = { 
    createS3OutputFile() 
    } 

我创建一个胖JAR并将其上传到S3。然后我SSH到集群主设备和运行的代码:

spark-submit \ 
    --deploy-mode cluster \ 
    --class "$class_name" \ 
    "s3://$s3_bucket/$app_s3_key" 

我看到这个在S3控制台:不是文件有文件夹。

enter image description here

每个文件夹(例如测试3.txt)包含的块的文件的长列表。如下图:

enter image description here

我如何输出一个简单的文本文件,以S3作为我的星火作业的输出?

回答

4

尝试这样做:

rdd.coalesce(1, shuffle = true).saveAsTextFile(...)

我的理解是,shuffle = true参数将导致此并行发生,因此会输出一个文本文件,但千万要小心庞大的数据文件。

Here是关于这个问题的更多细节。

+0

我是新来斯卡拉和火花。从Spark工作中获得结果的“规范”方式是什么?它是否以我的方式写入文件?它是否使用本机Java函数创建文件? – BlackSheep

+0

@BlackSheep真的取决于手头的任务,真的没有官方的方式。至于创建文件,每个RDD分区都会执行您告诉它执行的任务,因此在这种情况下,每个RDD分区将创建一个文件并写入其自己的文件。 – TheM00s3

+0

这仍然会产生文件夹结构,但只有一个孩子'part-00000'。 – Tim

-1

Spark是分布式计算。这意味着您的代码正在多个节点上运行。

saveAsTextFile()方法接受file path,而不是文件名。

您可以使用​​3210()或repartition来减少零件文件的数量。但它仍然会在文件路径下创建。

或者,您可以使用Hadoop File SystemFileUtil类更改文件名或合并多个零件文件为单个零件文件。

商店RDD至S3

rdd.saveAsTextFile("s3n://bucket/path/") 

此外,检查this

+0

您可以举一个例子,将作业的输出作为文件保存到S3中吗?你可以选择最简单的输出 - 一个计数(#)。如何将其保存为EMR中的输出? – BlackSheep

+0

@BlackSheep:更新了我的答案,将rdd存储为s3。 – Shankar

+1

'saveAsTextFile'就像我在问题中所说的那样。这怎么解决我的问题? – BlackSheep