2017-02-13 74 views
0

我正在尝试使用Spark(在Scala中)做些事情。我有一个Transformer类,看起来像这样:映射到Spark中的RDD的方法

class Transformer(transformerParameters: TransformerParameters) { 
    // Process the parameters 

    def transform(element: String): String = { 
    // Do stuff 
    } 
} 

我想这样做

val originalRDD = sc.textFile("blah") 
val transformer = new Transformer(parameters) 
val transformedRDD = originalRDD.map(transformer.transform) 

假设我不想或不能使Transformer类序列化,并进一步假设TransformerParameters其实是序列化的,我见过的人建议编写,而不是(或者我可能误解):

val transformedRDD = originalRDD.map(new Transformer(parameters).transform) 

我很科幻ne在集群的每个JVM上创建一个新的Transformer实例,但看起来像这样会为每一行创建一个新的Transformer,这看起来没有必要且可能非常昂贵。这究竟是什么?有没有办法为每一行创建一个新实例?

谢谢!

回答

2

您可以广播(隐式或显式)具有参数字段的对象,以及对Transformer的瞬态字段引用。

对此对象有一个方法来委托在Transformer上进行变换,但首先对Transformer进行延迟初始化(检查Transformer引用是否已初始化,如果不是用参数创建一个,则调用transform) 。

然后在map方法中,调用wrapper.transform而不是Transformer.transform - 这节省了每次调用时创建的对象,并解决了序列化问题,因为每个任务都获得它自己的包装器实例,因此它将自己的Transformer得到重用。

+0

这很有道理。这是一种正常的做事方式,还是我想做一些非常单一的事情? – zale

+0

我相信是这样,至少在Spark中(以及主要来自Java背景的人) - 这就是我在这种情况下所做的。最有可能采用更灵活的方式来做到这一点,但我还没有发现它! –

相关问题