2017-04-25 173 views
1

我有一个字典类型RDD我:PySpark:迭代过字典类型RDD

>>> a.collect() 

[{(1155718, 105): 14, (1155718, 1887): 2, (1155718, 1930): 12, (1155718, 927): 6, (1155718, 2783): 8, (1155718, 738): 4, (1155718, 952): 4, (1155718, 1196): 6, (1155718, 997): 4, (1155718, 2904): 38}]

只是为了检查:

>>> a.map(lambda x:type(x)).collect() 

[< type 'dict' >]

但是我不能遍历通过使用map()的字典型RDD。我想:

>>> a.map(lambda x:(k,v) for k,v in x.iteritems()) 

要我说出惊讶它导致错误:

Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
NameError: name 'x' is not defined 

我错过任何重要的一点在这里。

编辑:代码是所有权利限制与发电机的语法小bug正确的代码应该是:

a.map(lambda x:[(k,v) for k,v in x.iteritems()]) 

回答

1

我尝试这样做:

data = [{(1155718, 105): 14, (1155718, 1887): 2, (1155718, 1930): 12, (1155718, 927): 6, (1155718, 2783): 8, (1155718, 738): 4, 
     (1155718, 952): 4, (1155718, 1196): 6, (1155718, 997): 4, (1155718, 2904): 38}] 

rdd = sc.parallelize(data) 
rdd.flatMap(lambda _: [(k,v) for (k,v) in _.items()]).collect() 

,并得到这个:

[((1155718, 105), 14), 
((1155718, 738), 4), 
((1155718, 2904), 38), 
((1155718, 1887), 2), 
((1155718, 1196), 6), 
((1155718, 1930), 12), 
((1155718, 927), 6), 
((1155718, 2783), 8), 
((1155718, 997), 4), 
((1155718, 952), 4)] 
+0

你实际上是我的代码应该是'a.map(lambda x:[(k,v)for k,v in x.iteritems()])' – abhiieor

+0

只需要在这里注意:对于Python 3,使用'items()'。对于Python 2,使用'iteritems()' – titipata