2017-08-22 120 views
0

如何更改Pyspark中嵌套列的数据类型?举例来说,我如何将字符串的值的数据类型更改为int?Pyspark:更改嵌套列数据类型

参考:how to change a Dataframe column from String type to Double type in pyspark

{ 
    "x": "12", 
    "y": { 
     "p": { 
      "name": "abc", 
      "value": "10" 
     }, 
     "q": { 
      "name": "pqr", 
      "value": "20" 
     } 
    } 
} 
+0

1.请问这种变化需要是持久的,有改动保存至JSON文件?或者你在进行手术时是否需要精确度? – diek

+0

@diek需要白色书写json文件 –

回答

2

可以使用读取JSON数据

from pyspark import SQLContext 

sqlContext = SQLContext(sc) 
data_df = sqlContext.read.json("data.json", multiLine = True) 

data_df.printSchema() 

输出

root 
|-- x: long (nullable = true) 
|-- y: struct (nullable = true) 
| |-- p: struct (nullable = true) 
| | |-- name: string (nullable = true) 
| | |-- value: long (nullable = true) 
| |-- q: struct (nullable = true) 
| | |-- name: string (nullable = true) 
| | |-- value: long (nullable = true) 

现在你可以从y中列存取数据

data_df.select("y.p.name") 
data_df.select("y.p.value") 

输出

abc, 10 

好了,解决的办法是用正确的模式与错误的架构

from pyspark.sql.functions import * 
from pyspark.sql import Row 

df3 = spark.read.json("data.json", multiLine = True) 

# create correct schema from old 
c = df3.schema['y'].jsonValue() 
c['name'] = 'z' 
c['type']['fields'][0]['type']['fields'][1]['type'] = 'long' 
c['type']['fields'][1]['type']['fields'][1]['type'] = 'long' 

y_schema = StructType.fromJson(c['type']) 

# define a udf to populate the new column. Row are immuatable so you 
# have to build it from start. 

def foo(row): 
    d = Row.asDict(row) 
    y = {} 
    y["p"] = {} 
    y["p"]["name"] = d["p"]["name"] 
    y["p"]["value"] = int(d["p"]["value"]) 
    y["q"] = {} 
    y["q"]["name"] = d["q"]["name"] 
    y["q"]["value"] = int(d["p"]["value"]) 

    return(y) 
map_foo = udf(foo, y_schema) 

# add the column 
df3_new = df3.withColumn("z", map_foo("y")) 

# delete the column 
df4 = df3_new.drop("y") 


df4.printSchema() 

输出

root 
|-- x: long (nullable = true) 
|-- z: struct (nullable = true) 
| |-- p: struct (nullable = true) 
| | |-- name: string (nullable = true) 
| | |-- value: long (nullable = true) 
| |-- q: struct (nullable = true) 
| | |-- name: string (nullable = true) 
| | |-- value: long (nullable = true) 


df4.show() 

输出

添加新的嵌套列,除去列
+---+-------------------+ 
| x|     z| 
+---+-------------------+ 
| 12|[[abc,10],[pqr,10]]| 
+---+-------------------+ 
+0

@aswinids我编辑了这个问题。对此有任何想法? –

+0

@aswinids:感谢您的帮助。我们在json模式中有decima/timestamp数据类型吗? –

+0

@aswinids:如果我将10的值更改为“10”并使用type:'long',那么我会得到null –

0

使用任意变量名似乎很简单,但这是有问题的并且与PEP8相反。而在处理数字时,我建议避免在迭代这些结构时使用的常用名称,即值。

import json 

with open('random.json') as json_file: 
    data = json.load(json_file) 

for k, v in data.items(): 
    if k == 'y': 
     for key, item in v.items(): 
      item['value'] = float(item['value']) 


print(type(data['y']['p']['value'])) 
print(type(data['y']['q']['value'])) 
# mac → python3 make_float.py 
# <class 'float'> 
# <class 'float'> 
json_data = json.dumps(data, indent=4, sort_keys=True) 
with open('random.json', 'w') as json_file: 
    json_file.write(json_data) 

out json file

+0

这个问题的关键部分是我们每天都会产生大约60GB的数据,我们需要确保可扩展性,这就是为什么Spark是出路的原因 –

+0

当然,这无法处理如此庞大的数据量。 为什么你引用的问题不起作用? 从他们给出的文档处理这个例子: https://ghostbin.com/paste/wt5y6 – diek