1
首先,我理解持久数据结构与关于RDD的..更新的理念和永恒更新字典的PySpark RDD是我能想到的:)如何基于某些条件
我的问题是只有一个字:
给定字典(或Row对象)的RDD,我如何循环/映射并在该RDD上应用某些转换登录并接收应用了这些转换的新RDD。例如:
给出一个包含有一个RDD词典:
fbb = sc.parallelize(
[{'amount_gbp': -43.33,
'balance_gbp': 57.08,
'type': 'GED',
'id': 961690979,
'settled_jrnl_cr_datetime': u'(null)',
'virtual_cash_balance': 0,
'virtual_debt_balance': 0},
{'amount_gbp': 17.08,
'balance_gbp': 40.0,
'type': 'OIP',
'id': 962182953,
'settled_jrnl_cr_datetime': u'(null)',
'virtual_cash_balance': 0,
'virtual_debt_balance': 0}])
我试图以应用功能:
def update_virtual_cash_balance(x):
x.update({'virtual_cash_balance': x['amount_gbp'] + x['balance_gbp']}) if x['type'] == 'GED' else x
fbb.map(lambda x: update_virtual_cash_balance(x)).collect()
和预期:
[{'amount_gbp': -43.33,
'balance_gbp': 57.08,
'type': 'GED',
'id': 961690979,
'settled_jrnl_cr_datetime': u'(null)',
'virtual_cash_balance': 13.75,
'virtual_debt_balance': 0},
{'amount_gbp': 17.08,
'balance_gbp': 40.0,
'type': 'OIP',
'id': 962182953,
'settled_jrnl_cr_datetime': u'(null)',
'virtual_cash_balance': 0,
'virtual_debt_balance': 0}]
不过的了:
Out[411]: [None, None]
任何帮助我误解的东西都会很棒。
那不是我谁downvoted的帮助@LostInOverflow谢谢 – adebesin