2017-03-24 22 views
1

我在这里阅读这篇文章:https://spark.apache.org/docs/latest/programming-guide.html(请参阅将函数传递给Spark),但我的用例是使用类型化数据集与我的案例类。我试图使用单身对象来保存映射方法。我想知道如何打包我需要的功能来优化我的舞台的性能(将数据集从一种类型转换为另一种类型,然后写入实木复合地板)。当使用数据集,大型Java类和单例时,Spark传递函数

目前,阶段性步骤花费了大约300万行(〜1.5小时)的难以置信的长时间,大约880 MB数据输出到s3实木复合地板。

我在集群模式下运行,使用最少执行程序= 3,最大执行程序= 10,每个执行程序有4个内核,驱动程序内存8GB。

-

高层次的编码部分:

我映射一个案例类C1到另一个案例类C2。 C1和C2有大约16个字段,各种类型,如java.sql.Timestamp,Option [String] Option [Int],String,Int,BigInt。

case class C1(field1 : _, field2 : _, field3 : _, ...) 
case class C2(field1 : _, field2 : _, field3 : _, ...) 

为了从C1至C2映射,我需要一个非常大的java类Ĵ我是从https://github.com/drtimcooper/LatLongToTimezone复制的功能(静态方法)。

public class J { 
    public static String getValue((float) v) = ... 
}  

我已经在一个util类里面写了映射函数Util,它具有许多其他有用的函数,它们被映射函数调用。

=========

基本上我的码流是这样的:

case class C1(field1 : _, field2 : _, field3 : _, ...) 
case class C2(field1 : _, field2 : _, field3 : _, ...) 

// very large java class J that only contains static methods 
public class J { 
    public static String getValue((float) v) = ... 

    ... 
}  

object Util { 
    def m1(i: Int): Int = ... 

    def m2(l: Option[BigDecimal], l2: Option[BigDecimal]): Int = { 
     J.getValue(l.get, l2.get) 
    } 

    ... 

    def convert_C1_to_C2(c1: C1): C2 = { 
    C2(
     field1 = m1(c1.field1), 
     field2 = m2(c1.field2, c1.field3), 
     ... 
    } 
} 

dataframe.as[C1].map(Util.convert_C1_to_C2) 
    .mode(SaveMode.Overwrite) 
    .parquet("s3a://s3Path") 

有没有写这个更优化的方式吗?或者任何人都可以指出我做过这些事情的任何明显的错误?看着我的代码,我不知道为什么它要花很长时间才能完成任务。

我已经试过合并说16个分区来减少s3中的文件数量,但这似乎使作业运行速度慢得多。通常会有64个分区没有任何合并。

回答

0

您可能刚刚碰到包含elsewhere的slow-fakes-s3-rename问题。在那里讨论一些修复程序。

相关问题