我是Spark新手。我有一个简单的pyspark脚本。它读取一个json文件,将其平滑并将其作为parquet压缩文件写入S3位置。PySpark输出文件数
读取和转换步骤运行速度非常快,并且使用50个执行人(我在conf设置)。但写入阶段需要很长时间,只写入一个大文件(480MB)。
怎样的文件的数量保存的决定? 写操作可以以某种方式加快吗?
感谢, 拉姆。
我是Spark新手。我有一个简单的pyspark脚本。它读取一个json文件,将其平滑并将其作为parquet压缩文件写入S3位置。PySpark输出文件数
读取和转换步骤运行速度非常快,并且使用50个执行人(我在conf设置)。但写入阶段需要很长时间,只写入一个大文件(480MB)。
怎样的文件的数量保存的决定? 写操作可以以某种方式加快吗?
感谢, 拉姆。
文件输出的数目等于的RDD被保存分区的数量。在此示例中,RDD被重新分区以控制输出文件的数量。
尝试:
repartition(numPartitions) - 洗牌的数据中随机RDD到 创造更多或更少的分区并在它们之间进行平衡。 这总是通过网络混洗所有数据。
>>> dataRDD.repartition(2).saveAsTextFile("/user/cloudera/sqoop_import/orders_test")
文件输出的数目是一样的RDD的partitionds的数目。
$ hadoop fs -ls /user/cloudera/sqoop_import/orders_test
Found 3 items
-rw-r--r-- 1 cloudera cloudera 0 2016-12-28 12:52 /user/cloudera/sqoop_import/orders_test/_SUCCESS
-rw-r--r-- 1 cloudera cloudera 1499519 2016-12-28 12:52 /user/cloudera/sqoop_import/orders_test/part-00000
-rw-r--r-- 1 cloudera cloudera 1500425 2016-12-28 12:52 /user/cloudera/sqoop_import/orders_test/part-00001
更新:
的textFile method也花费 控制文件的分区的数目的可选的第二个参数。默认情况下,星火 创建该文件的每个块一个分区(块被 默认HDFS 64MB),但你也可以通过传递一个较大的值,要求有较高的一些 分区。请注意,分区的块数不能少于 。
...但这是可能分区的最小数目,所以它们不能保证。
,所以如果你想在读取分区,你应该使用这个....
dataRDD=sc.textFile("/user/cloudera/sqoop_import/orders").repartition(2)
谢谢!重新分配何时发生?读取期间是否可以对RDD进行分区?还是它必须是一个单独的步骤? – Ram
@Ram - 看到更新的答案 - 如果我的努力帮助您解决问题,请接受我的答案是公认的答案(点击正确的符号旁边的上/下箭头上面,也单击向上箭头)欢呼:) –
应该提到在减少分区数量的情况下,人们应该更倾向于'重新分配'而不是'重新分配',因为它避免了全面洗牌。这是因为Spark知道它可以将数据保留在所需的分区数量上,只将数据从额外的节点上移走。 – user4601931