2016-05-17 40 views
4

我想在pySpark mllib中构建一个简单的自定义估算器。我有here,它可以写一个自定义的变压器,但我不知道如何在Estimator上做到这一点。我也不明白什么@keyword_only做什么,为什么我需要这么多的制定者和获得者。 Scikit学习似乎有定制机型see here适当的文件(但pySpark不如何在PySpark中自定义估算器mllib

伪为例型号代码:

class NormalDeviation(): 
    def __init__(self, threshold = 3): 
    def fit(x, y=None): 
     self.model = {'mean': x.mean(), 'std': x.std()] 
    def predict(x): 
     return ((x-self.model['mean']) > self.threshold * self.model['std']) 
    def decision_function(x): # does ml-lib support this? 

回答

9

一般来说没有文档,因为作为星火1.6/2.0最相关的API并不打算是公共的,应在星火2.1.0(见SPARK-7146)更改。

API是比较复杂的,因为它必须遵循特定的惯例,以使给定TransformerEstimator兼容与Pipeline API。这些方法中的一些可能是读写和网格搜索等功能所必需的。其他,如keyword_only只是一个简单的帮手,而不是严格要求。

假设您已经定义了以下的配料插件均值参数:

from pyspark.ml.pipeline import Estimator, Model, Pipeline 
from pyspark.ml.param.shared import * 
from pyspark.sql.functions import avg, stddev_samp 


class HasMean(Params): 

    mean = Param(Params._dummy(), "mean", "mean", 
     typeConverter=TypeConverters.toFloat) 

    def __init__(self): 
     super(HasMean, self).__init__() 

    def setMean(self, value): 
     return self._set(mean=value) 

    def getMean(self): 
     return self.getOrDefault(self.mean) 

标准偏差参数:

class HasStandardDeviation(Params): 

    stddev = Param(Params._dummy(), "stddev", "stddev", 
     typeConverter=TypeConverters.toFloat) 

    def __init__(self): 
     super(HasStandardDeviation, self).__init__() 

    def setStddev(self, value): 
     return self._set(stddev=value) 

    def getStddev(self): 
     return self.getOrDefault(self.stddev) 

和门槛:

class HasCenteredThreshold(Params): 

    centered_threshold = Param(Params._dummy(), 
      "centered_threshold", "centered_threshold", 
      typeConverter=TypeConverters.toFloat) 

    def __init__(self): 
     super(HasCenteredThreshold, self).__init__() 

    def setCenteredThreshold(self, value): 
     return self._set(centered_threshold=value) 

    def getCenteredThreshold(self): 
     return self.getOrDefault(self.centered_threshold) 

您可以创建基本Estimator为如下:

class NormalDeviation(Estimator, HasInputCol, 
     HasPredictionCol, HasCenteredThreshold): 

    def _fit(self, dataset): 
     c = self.getInputCol() 
     mu, sigma = dataset.agg(avg(c), stddev_samp(c)).first() 
     return (NormalDeviationModel() 
      .setInputCol(c) 
      .setMean(mu) 
      .setStddev(sigma) 
      .setCenteredThreshold(self.getCenteredThreshold()) 
      .setPredictionCol(self.getPredictionCol())) 

class NormalDeviationModel(Model, HasInputCol, HasPredictionCol, 
     HasMean, HasStandardDeviation, HasCenteredThreshold): 

    def _transform(self, dataset): 
     x = self.getInputCol() 
     y = self.getPredictionCol() 
     threshold = self.getCenteredThreshold() 
     mu = self.getMean() 
     sigma = self.getStddev() 

     return dataset.withColumn(y, (dataset[x] - mu) > threshold * sigma) 

最后,可以使用如下:

df = sc.parallelize([(1, 2.0), (2, 3.0), (3, 0.0), (4, 99.0)]).toDF(["id", "x"]) 

normal_deviation = NormalDeviation().setInputCol("x").setCenteredThreshold(1.0) 
model = Pipeline(stages=[normal_deviation]).fit(df) 

model.transform(df).show() 
## +---+----+----------+ 
## | id| x|prediction| 
## +---+----+----------+ 
## | 1| 2.0|  false| 
## | 2| 3.0|  false| 
## | 3| 0.0|  false| 
## | 4|99.0|  true| 
## +---+----+----------+ 
+0

的感谢!所以Estimator的状态也被认为是一个参数? –

+0

您是否将估算器的参数调整为模型参数?如果是这样,这种设计方式很方便,但对于基本实现来说并不难。 – zero323

+0

好的,任何希望得到一些关于如何坚持像这样的自定义步骤的建议? –

相关问题