2016-08-24 82 views
1

我具有与每个40GB存储器的四个节点Hadoop集群(MAPR)。我需要在大数据集的一个字段上“应用”一个函数(500万行)。我的代码的流程是,我读了蜂巢表中的数据作为一个火花数据帧和应用上的一列所需的功能如下:地图变换性能火花数据帧VS RDD

schema = StructType([StructField("field1", IntegerType(), False), StructField("field2", StringType(), False),StructField("field3", FloatType(), False)]) 
udfCos = udf(lambda row: function_call(row), schema) 
result = SparkDataFrame.withColumn("temp", udfCos(stringArgument)) 

类似的RDD版本可能像下面这样:

result = sparkRDD.map(lambda row: function_call(row)) 

我想提高这段代码的性能,我确保代码以最大并行度和降低的吞吐量运行 - 我需要帮助使用SparkConf中的'重新分配''并行性值之类的火花概念'或其他方法,在我的问题的背景下。任何帮助表示赞赏。

我的火花启动参数:

MASTER="yarn-client" /opt/mapr/spark/spark-1.6.1/bin/pyspark --num-executors 10 --driver-cores 10 --driver-memory 30g --executor-memory 7g --executor-cores 5 --conf spark.driver.maxResultSize="0" --conf spark.default.parallelism="150" 
+0

对于初学者不要使用Python的UDF。 – zero323

+0

该建议背后的任何具体推理?我没有这导致对RDD地图和UDF(与默认值) – Mike

+2

一般这种往返JVM同一运行时持续时间的样品测试 - >的Python - > JVM是昂贵的和相对慢的,并有一些其他难看特性(特别是在火花<2 ),所以如果你可以的话,你应该更喜欢原生的(JVM)函数组合在UDF上。 – zero323

回答

0

对于调整你的应用程序,你需要知道一些事情

1)你需要监视你的应用集群是利用不足或没有太多的资源如何通过已创建

监控可以使用各种工具如做你的应用程序中使用。 Ganglia从Ganglia你可以找到CPU,内存和网络使用情况。根据观察有关CPU和内存使用

2)你可以得到一个更好的主意,需要什么样的调整,为您的应用程序,你

形式星火点

在火花defaults.conf

您可以指定需要什么样的序列化您的应用程序需要多少驱动程序内存和执行程序内存,即使您可以更改垃圾回收算法。

下面是几个例子,你可以调整根据您的要求

spark.serializer     org.apache.spark.serializer.KryoSerializer 
spark.executor.extraJavaOptions -XX:MaxPermSize=2G -XX:+UseG1GC 
spark.driver.extraJavaOptions -XX:MaxPermSize=6G -XX:+UseG1GC 

了解更多详情这个参数是指http://spark.apache.org/docs/latest/tuning.html