2016-09-01 53 views
1

考虑放射性散布具有低于数据集 其中10000241是键,其余的是值reduceByKey在火花用于添加元组

('10000241',([0,0,1],[None,None,'RX'])) 
('10000241',([0,2,0],[None,'RX','RX'])) 
('10000241',([3,0,0],['RX',None,None])) 


pv1 = rdd.reduceBykey(lambda x,y :(
        addtup(x[0],y[0]), 
        addtup(x[1],y[1]), 
       )) 


def addtup(t1,t2): 
    j =() 
    for k,v in enumerate(t1): 
     j = j + (t1[k] + t2[k],) 
    return j 

最终输出我想是(10000241,(3,2,1)(” RX','RX','RX)) 但我得到的错误无法添加无类型的无类型或nonetype Str .how我可以克服这个问题?

+2

我不是当然,为什么这是一个Spark问题,但在标准Python中给出了这三个元素,那么如何将RX字符串与None结合起来呢?这是错误 –

+0

另外你想要做的事情让我想起SQL中的COALESCE运算符,除了你有0和None而不是null –

回答

1

如果我正确地理解了你,你想总结第一个元组中的数字并在第二个元组中使用逻辑?

我想你应该重写你的函数如下:

def addtup(t1,t2): 
    left = list(map(lambda x: sum(x), zip(t1[0], t2[0]))) 
    right = list(map(lambda x: x[0] or x[1], zip(t1[1], t2[1]))) 
    return (left, right) 

然后你可以使用它像这样:

rdd.reduceBykey(addtup) 

这里是一个示范

import functools 

data = (([0,0,1],[None,None,'RX']), 
([0,2,0],[None,'RX','RX']), 
([3,0,0],['RX',None,None])) 


functools.reduce(addtup, data) 
#=> ([3, 2, 1], ['RX', 'RX', 'RX']) 
+0

感谢您的解决方案,但我尝试了不同的解决方案,因为我想使用spark 。谢谢你的努力!。 – Abid

+0

@解决方案是为火花:) – fl00r