2016-07-29 204 views
1

首先,我理解持久数据结构与关于RDD的..更新的理念和永恒更新字典的PySpark RDD是我能想到的:)如何基于某些条件

我的问题是只有一个字:

给定字典(或Row对象)的RDD,我如何循环/映射并在该RDD上应用某些转换登录并接收应用了这些转换的新RDD。例如:

给出一个包含有一个RDD词典:

fbb = sc.parallelize(
    [{'amount_gbp': -43.33, 
     'balance_gbp': 57.08, 
     'type': 'GED', 
     'id': 961690979, 
     'settled_jrnl_cr_datetime': u'(null)', 
     'virtual_cash_balance': 0, 
     'virtual_debt_balance': 0}, 
    {'amount_gbp': 17.08, 
     'balance_gbp': 40.0, 
     'type': 'OIP', 
     'id': 962182953, 
     'settled_jrnl_cr_datetime': u'(null)', 
     'virtual_cash_balance': 0, 
     'virtual_debt_balance': 0}]) 

我试图以应用功能:

def update_virtual_cash_balance(x): 
    x.update({'virtual_cash_balance': x['amount_gbp'] + x['balance_gbp']}) if x['type'] == 'GED' else x 

    fbb.map(lambda x: update_virtual_cash_balance(x)).collect() 

和预期:

[{'amount_gbp': -43.33, 
    'balance_gbp': 57.08, 
    'type': 'GED', 
    'id': 961690979, 
    'settled_jrnl_cr_datetime': u'(null)', 
    'virtual_cash_balance': 13.75, 
    'virtual_debt_balance': 0}, 
{'amount_gbp': 17.08, 
    'balance_gbp': 40.0, 
    'type': 'OIP', 
    'id': 962182953, 
    'settled_jrnl_cr_datetime': u'(null)', 
    'virtual_cash_balance': 0, 
    'virtual_debt_balance': 0}] 

不过的了:

Out[411]: [None, None] 

任何帮助我误解的东西都会很棒。

回答

1
  • update_virtual_cash_balance不返回任何东西,所以你得到None
  • update方法不返回任何东西,所以你会得到None即使update_virtual_cash_balance返回值
  • 你不应该在的地方修改数据。 RDD是不可变的,变异的数据可能会产生不良影响。

尝试:

def update_virtual_cash_balance(x): 
    if x['type'] == 'GED': 
     z = x.copy() # shallow copy should be enough here 
     z.update({'virtual_cash_balance': x['amount_gbp'] + x['balance_gbp']} 
     return z 
    return x 
+1

那不是我谁downvoted的帮助@LostInOverflow谢谢 – adebesin