0
我必须开发一个Spark应用程序,我必须使用Spark 1.3,所以我不能使用窗口函数。我选择迭代单个元素组,通过rdd键创建分组。我目前找到的解决方案是收集密钥,然后通过查找(密钥)来获取相应的RDD。我知道我的方法效率很低,但我不知道如何应用将RDD转换为列表的函数,然后以另一种方式返回另一个列表。Spark迭代RDD按键分组
logon_dhcp = logons.map(lambda logon: (logon.user, (logon.dhcpscopename, logon.city, logon.timestamp)))
logon_dhcp = logon_dhcp.groupByKey()
dhcp_change_list = []
for key in logon_dhcp.keys().collect():
new_list = dhcp_changed(key,logon_dhcp.lookup(key))
dhcp_change_list = list(set().union(dhcp_change_list,new_list))
def dhcp_changed(key,group):
values = list(group[0])
values_sorted = sorted(values, key=lambda tup: tup[2])
prevCity = None
prevValue = None
prevTime = None
res = list()
for value in values_sorted:
if prevCity != None and prevCity != value[1] and notEnoughTime(prevTime,value[2]):
res.append((key, prevTime.strftime('%Y-%m-%d %H:%M:%S'), prevCity, value[2].strftime('%Y-%m-%d %H:%M:%S'), value[1]))
prevCity = value[1]
prevTime = value[2]
prevValue = value
return res
我怎样才能做到同样的事情像aggregateByKey()?