2014-11-08 32 views
0

我是一个新手火花。我正在使用python(pyspark)编写我的程序。我使用groupByKey函数将键 - 值对转换为键 - (值列表)对。我在64核心计算机上运行spark,并尝试使用以下命令启动程序来使用所有64个内核。星火不利用任何减少parallization

spark-submit --master local[64] my_program.py 

然而,我注意到,在执行groupByKey功能,正在使用只有一个核心。数据相当大。那么,为什么spark不会将其分割成64个分区,并在64个不同的核心中进行缩减/分组?

我错过了一些并行化的重要步骤?

代码的相关部分看起来像这样,

# Here input itself is a key-(list of values) pair. The mapPartitions 
# function is used to return a key-value pair (variable x), from 
# which another key-(list of values) pair is created (variable y) 
x = input.mapPartitions(transFunc) 
# x contains key value pair, such as [(k1, v1), (k1, v2), (k2, v3)] 
y = x.groupByKey() 
# y contains key-list of values pair such as [(k1, [v1, v2]), (k2, [v2])] 
+0

你是如何加载你的数据? – maasg 2014-11-08 11:42:34

+0

@maasg:我使用mapPartitions。在mapPartitions之后,say变量x中的结果数据是一个键值对,其中key是一个字符串,值也是一个字符串。然后,我使用groupByKey组成键(值列表)对的关键字,其中键与x中的键相同,并且值列表是字符串值列表。 – MetallicPriest 2014-11-08 11:51:53

+0

您可以将代码添加到问题中吗? – maasg 2014-11-08 11:56:10

回答

1

Spark中的默认并行级别是由配置选项决定:spark.default.parallelism。的缺省值是:(*从docs

本地模式:8个其他::核心的本地机器Mesos细粒度 模式上编号的所有执行器节点或2上的核心总数, 取其较大

RDDS可以使用这些操作更多或更少的分区被重新组合:

rdd.repartition(partitions: Int) // redistributes the RDD into the given nr of partitions 
rdd.coalesce(partitions:Int) // reduces the number of partitions of the RDD to the given nr 

需要一个内部整理操作经常采取numPartitions参数来指定目标分区的数量。在这样的操作之后,RDD将具有新的分区数量。 让我说明了一个例子:

考虑:

val rdd = sc.textFile("localFile") // default nr of partitions. Let's say 2 

然后:

val moreParallelRdd = rdd.repartition(64) // 64 partitions 
val onePartitionRdd = moreParallelRdd.coalesce(1) // 1 partition 
val sortedRdd = onePartitionRdd.sortBy(x=> sortSelector(x), numPartitions=10) // 10 partitions