2016-12-28 43 views
0

我是Spark新手。我有一个简单的pyspark脚本。它读取一个json文件,将其平滑并将其作为parquet压缩文件写入S3位置。PySpark输出文件数

读取和转换步骤运行速度非常快,并且使用50个执行人(我在conf设置)。但写入阶段需要很长时间,只写入一个大文件(480MB)。

怎样的文件的数量保存的决定? 写操作可以以某种方式加快吗?

感谢, 拉姆。

回答

1

文件输出的数目等于的RDD被保存分区的数量。在此示例中,RDD被重新分区以控制输出文件的数量。

尝试:

repartition(numPartitions) - 洗牌的数据中随机RDD到 创造更多或更少的分区并在它们之间进行平衡。 这总是通过网络混洗所有数据。

>>> dataRDD.repartition(2).saveAsTextFile("/user/cloudera/sqoop_import/orders_test") 

文件输出的数目是一样的RDD的partitionds的数目。

$ hadoop fs -ls /user/cloudera/sqoop_import/orders_test 
Found 3 items 
-rw-r--r-- 1 cloudera cloudera   0 2016-12-28 12:52 /user/cloudera/sqoop_import/orders_test/_SUCCESS 
-rw-r--r-- 1 cloudera cloudera 1499519 2016-12-28 12:52 /user/cloudera/sqoop_import/orders_test/part-00000 
-rw-r--r-- 1 cloudera cloudera 1500425 2016-12-28 12:52 /user/cloudera/sqoop_import/orders_test/part-00001 

而且检查:coalesce(numPartitions)

source-1 | source-2


更新:

textFile method也花费 控制文件的分区的数目的可选的第二个参数。默认情况下,星火 创建该文件的每个块一个分区(块被 默认HDFS 64MB),但你也可以通过传递一个较大的值,要求有较高的一些 分区。请注意,分区的块数不能少于 。

...但这是可能分区的最小数目,所以它们不能保证。

,所以如果你想在读取分区,你应该使用这个....

dataRDD=sc.textFile("/user/cloudera/sqoop_import/orders").repartition(2) 
+0

谢谢!重新分配何时发生?读取期间是否可以对RDD进行分区?还是它必须是一个单独的步骤? – Ram

+1

@Ram - 看到更新的答案 - 如果我的努力帮助您解决问题,请接受我的答案是公认的答案(点击正确的符号旁边的上/下箭头上面,也单击向上箭头)欢呼:) –

+1

应该提到在减少分区数量的情况下,人们应该更倾向于'重新分配'而不是'重新分配',因为它避免了全面洗牌。这是因为Spark知道它可以将数据保留在所需的分区数量上,只将数据从额外的节点上移走。 – user4601931