2017-04-13 106 views
0

我的输入csv数据,某些行包含重复的字段或一些缺少的字段,从这些数据我想从每行删除重复的字段,然后所有的行应包含所有的字段,与值为NULL的地方是不包含字段的地方。从输入数据中删除RDD中的重复字段

+0

你尝试过什么吗?你有什么具体问题,或者你只是在寻找某人向你展示解决方案? – Pushkr

+0

@Pushkr,我试着这样做,通过遍历每一行并获取基于Key的字段,如果该键重复再次忽略它,并且如果没有任何关键字将其填充为null。但我的数据集是巨大的,有时也可以连续30到40个重复的K:V对......所以遍历所有的字段和行对性能而言是一个糟糕的设计......所以想要关于方法的建议。 – user491

回答

0

试试这个:

def transform(line): 
    """ 
    >>> s = 'id:111|name:dave|age:33|city:london' 
    >>> transform(s) 
    ('id:111', {'age': '33', 'name': 'dave', 'city': 'london'}) 
    """ 
    bits = line.split("|") 
    key = bits[0] 
    pairs = [v.split(":") for v in bits[1:]] 
    return key, {kv[0].strip(): kv[1].strip() for kv in pairs if len(kv) == 2} 

rdd = (sc 
    .textFile("/tmp/sample") 
    .map(transform)) 

查找键:

from operator import attrgetter 

keys = rdd.values().flatMap(lambda d: d.keys()).distinct().collect() 

创建数据帧:

df = rdd.toDF(["id", "map"]) 

,拓展:

df.select(["id"] + [df["map"][k] for k in keys]).show() 
0

所以我假设你已经从文本文件中获得了rdd。我创建了一个在这里:

rdd = spark.sparkContext.parallelize([(u'id:111', u'name:dave', u'dept:marketing', u'age:33', u'city:london'), 
(u'id:123', u'name:jhon', u'dept:hr', u'city:newyork'), 
(u'id:100', u'name:peter', u'dept:marketing', u'name:peter', u'age:30', u'city:london'), 
(u'id:222', u'name:smith', u'dept:finance', u'city:boston'), 
(u'id:234', u'name:peter', u'dept:service', u'name:peter', u'dept:service', u'age:32', u'city:richmond')]) 

我只是使函数映射rddkeyvalue对,并同时删除重复的一个

from pyspark.sql import Row 
from pyspark.sql.types import * 

def split_to_dict(l): 
    l = list(set(l)) # drop duplicate here 
    kv_list = [] 
    for e in l: 
     k, v = e.split(':') 
     kv_list.append({'key': k, 'value': v}) 
    return kv_list 

rdd_map = rdd.flatMap(lambda l: split_to_dict(l)).map(lambda x: Row(**x)) 
df = rdd_map.toDF() 

输出前5行的例如

+----+---------+ 
| key| value| 
+----+---------+ 
|city| london| 
|dept|marketing| 
|name|  dave| 
| age|  33| 
| id|  111| 
+----+---------+