0
我的输入csv数据,某些行包含重复的字段或一些缺少的字段,从这些数据我想从每行删除重复的字段,然后所有的行应包含所有的字段,与值为NULL的地方是不包含字段的地方。从输入数据中删除RDD中的重复字段
我的输入csv数据,某些行包含重复的字段或一些缺少的字段,从这些数据我想从每行删除重复的字段,然后所有的行应包含所有的字段,与值为NULL的地方是不包含字段的地方。从输入数据中删除RDD中的重复字段
试试这个:
def transform(line):
"""
>>> s = 'id:111|name:dave|age:33|city:london'
>>> transform(s)
('id:111', {'age': '33', 'name': 'dave', 'city': 'london'})
"""
bits = line.split("|")
key = bits[0]
pairs = [v.split(":") for v in bits[1:]]
return key, {kv[0].strip(): kv[1].strip() for kv in pairs if len(kv) == 2}
rdd = (sc
.textFile("/tmp/sample")
.map(transform))
查找键:
from operator import attrgetter
keys = rdd.values().flatMap(lambda d: d.keys()).distinct().collect()
创建数据帧:
df = rdd.toDF(["id", "map"])
,拓展:
df.select(["id"] + [df["map"][k] for k in keys]).show()
所以我假设你已经从文本文件中获得了rdd
。我创建了一个在这里:
rdd = spark.sparkContext.parallelize([(u'id:111', u'name:dave', u'dept:marketing', u'age:33', u'city:london'),
(u'id:123', u'name:jhon', u'dept:hr', u'city:newyork'),
(u'id:100', u'name:peter', u'dept:marketing', u'name:peter', u'age:30', u'city:london'),
(u'id:222', u'name:smith', u'dept:finance', u'city:boston'),
(u'id:234', u'name:peter', u'dept:service', u'name:peter', u'dept:service', u'age:32', u'city:richmond')])
我只是使函数映射rdd
到key
和value
对,并同时删除重复的一个
from pyspark.sql import Row
from pyspark.sql.types import *
def split_to_dict(l):
l = list(set(l)) # drop duplicate here
kv_list = []
for e in l:
k, v = e.split(':')
kv_list.append({'key': k, 'value': v})
return kv_list
rdd_map = rdd.flatMap(lambda l: split_to_dict(l)).map(lambda x: Row(**x))
df = rdd_map.toDF()
输出前5行的例如
+----+---------+
| key| value|
+----+---------+
|city| london|
|dept|marketing|
|name| dave|
| age| 33|
| id| 111|
+----+---------+
你尝试过什么吗?你有什么具体问题,或者你只是在寻找某人向你展示解决方案? – Pushkr
@Pushkr,我试着这样做,通过遍历每一行并获取基于Key的字段,如果该键重复再次忽略它,并且如果没有任何关键字将其填充为null。但我的数据集是巨大的,有时也可以连续30到40个重复的K:V对......所以遍历所有的字段和行对性能而言是一个糟糕的设计......所以想要关于方法的建议。 – user491