2016-11-19 24 views
1

我想为火花的Pipelines写一个自定义Estimator。它应该执行数据清理任务。这意味着一些行将被删除,一些列被删除,一些列被添加,一些值在现有的列中被替换。 IT还应该将某些数字列的平均值或最小值存储为NaN替换值。火花自定义预处理估算器

然而,

override def transformSchema(schema: StructType): StructType = { 
    schema.add(StructField("foo", IntegerType)) 
} 

仅支持加入域? 我很好奇我该如何处理这个问题。

回答

2

你是正确的,只有添加字段被StructField api支持。但是,这并不意味着你也不能删除字段!

StructType有一个值成员fields,它给你一个Array[StructField]。你可以.filter()这个数组,但你认为合适的(通过name,dataType,或更复杂的东西),只保留你想要的列。

一旦你完成了你的过滤,你有两个选择:

  1. 添加StructField每个新列过滤fields阵列,构建从这个
  2. 一个StructTypefields构建StructType数组并使用.add(...)添加新列。
+0

您是否知道在添加新列时是否会影响性能? –

+0

我不知道性能成本肯定会是什么,但总的来说,这将是新计算的计算成本的函数。只要你的工作是按照火花的性质分配的,你应该没问题。 请记住,数据帧操作是懒惰地执行的,并且在使用数据帧API时,催化剂优化器将尽最大努力优化您的操作。不要害怕在数据框中出现很多很多列。 –