2017-09-13 17 views
1

说我有以下数据:如何将列添加到Spark中的爆炸结构?

{"id":1, "payload":[{"foo":1, "lol":2},{"foo":2, "lol":2}]} 

我想爆炸载荷和列添加到它,就像这样:

df = df.select('id', F.explode('payload').alias('data')) 
df = df.withColumn('data.bar', F.col('data.foo') * 2) 

然而,这导致与三列的数据帧:

  • id
  • data
  • data.bar

我预计data.bardata结构的一部分......

我怎样才能添加一列分解结构,而不是添加顶层列?

+1

您必须重建模式,使用'select'或使用'udf'来修改数据 - 几乎所有这些选项都包含在这里:https://stackoverflow.com/questions/31615657/ how to add-a-new-struct-column-to-a-dataframe –

+0

[如何向DataFrame添加新的Struct列]可能的副本(https://stackoverflow.com/questions/31615657/how-添加新的结构列到数据框) –

回答

1
df = df.withColumn('data', f.struct(
    df['data']['foo'].alias('foo'), 
    (df['data']['foo'] * 2).alias('bar') 
)) 

这将导致:

root 
|-- id: long (nullable = true) 
|-- data: struct (nullable = false) 
| |-- col1: long (nullable = true) 
| |-- bar: long (nullable = true) 

UPDATE

def func(x): 
    tmp = x.asDict() 
    tmp['foo'] = tmp.get('foo', 0) * 100 
    res = zip(*tmp.items()) 
    return Row(*res[0])(*res[1]) 

df = df.withColumn('data', f.UserDefinedFunction(func, StructType(
    [StructField('foo', StringType()), StructField('lol', StringType())]))(df['data'])) 

附:

Spark几乎不支持inplace opreation。

所以你每次想要做就位,你需要做实际取代

+0

这绝对是朝着正确的方向发展!有没有办法做到这一点,而不知道“数据”的内容(当然除了'data.foo')?我编辑了我的问题,添加了一个额外的'data.lol'列来使这个更清晰。 – surjikal