0
我有这样的数据,滤波RDD并且提取匹配的火花蟒数据
cl_id cn_id cn_value
10004, 77173296 ,390.0
10004, 77173299 ,376.0
10004, 77173300 ,0.0
20005, 77173296 ,0.0
20005, 77173299 ,6.0
2005, 77438800 ,2.0
Cl_id的ID:10004,20005
过滤器的10004
10004, 77173296 ,390.0
10004, 77173299 ,376.0
过滤器的20005
20005, 77173296 ,0.0
20005, 77173299 ,6.0
Now我想返回RDD一样,
10004,cn_id,x1(77173296.value,77173300.value) ==> 10004,77173296,390.0,376.0
20005,cn_id,x1(77173296.value,77173300.value) ==> 20005,77173296,0.0,6.0
而且我想在此return_RDD
执行一些操作:
def cal_for(rdd_list):
#list.map(position1).filter(cn_id for this formula)-> calculate that formula -> store in a separate RDD -> Return that RDD
rdd_list = rdd_list.map(lambda line:line.split(','))
new_list = rdd_list.map(lambda x: (x[0]+', '+x[1],float(x[2])))
new_list = rdd_list.filter(lambda x: x[1] == '77173296' && x[1] == '77173299')
## then get the RDD containing respective cn_values for cn_id 77173296 & cn_id 77173299
## and apply the following formula whre a=77173296.value b=77173299.value for cl_id 1004
try:
# want to process RDD with this Formula
return ((float(a)/float(a+b))*100)
except ZeroDivisionError:
return 0
#return or save cal_RDD
由于我是新来的这将面临我的问题。并从输入Rdd我只是想过滤基于cl_id的结果,并将proccesed Rdd传递给cal_fo() –
您可能想了解更多关于Scala和Spark的内容。例如,您不需要返回关键字,并且在您拥有RDD后操作数据相对简单。我不完全确定你的问题是Spark还是希望有人为你编写逻辑。我想你应该描述你希望达到的目标(没有提到实际值)。 –
真的非常感谢Steven。我会更加努力并试图简化和实施它。非常感谢。 –