2017-07-20 40 views
1

我已经写了一个示例函数,使用python中的spark。的功能如下降低跨分区火花的功能pyspark

#!/usr/bin/env python 
from __future__ import print_function 
from pyspark.sql import SparkSession 
import os 
import sys 
os.environ["SPARK_HOME"] = "/usr/local/spark" 
os.environ["PYSPARK_PYTHON"]="/usr/bin/python3.4" 

spark = SparkSession \ 
    .builder \ 
    .appName("testpython") \ 
    .getOrCreate() 
rdd1 = spark.sparkContext.parallelize([1,6,5,2,99,1000,100009,10000,139,44,45343,23234,34]) 
**rdd3=rdd1.reduce(lambda x,y:x+1)** 
print(rdd3) 

在降低功能我们已经给出(拉姆达X,Y:X + 1)的理想的结果必须是13,用于上述功能,但结果来作为6

任何人都可以解释为什么结果是6而不是13? 是否因为火花上的分区间的数据剖视?

控制台输出:

/usr/bin/python3.4 /home/PycharmProjects/sampleproject/ttestexmple.py 用放电的默认log4j的配置文件:组织/阿帕奇/火花/ log4j-defaults.properties 默认设置日志级别改为“WARN”。 要调整日志记录级别,请使用sc.setLogLevel(newLevel)。对于SparkR,使用setLogLevel(newLevel)。 17/07/20 17点45分14秒WARN NativeCodeLoader:无法加载原生的Hadoop 17/07/20 17点45分14秒WARN utils的:设置SPARK_LOCAL_IP如果您需要绑定到另一个地址

处理完成退出代码0

+0

它是什么你想实现使用减少? – sau

+0

我想实现第一个memeber加上数量的元素可以说没有使用任何默认火花像计数或任何内部我想第一个数字rdd +(元素数) –

+0

我已经提供了答案。但是我希望你知道这不是减少应用的方式。我不知道为什么你不想使用默认功能。 – sau

回答

2

是的你是绝对正确的。您可以确保您的rdd仅使用1个位置

rdd1 = rdd1.coalesce(1) 
rdd2 = rdd1.reduce(lambda x,y: x+1) 

现在您将得到预期的答案。

原因是你的rdd有多个分区,而你正试图使用​​一个根本不使用y的reduce。因此,让我们说你的rdd有两个分区,所以你的减少得到像这样的(reduce on partition 1, reduce on partition 2),最后它会给你reduce result on partion 1 + 1

+0

将整个数据放入一个分区时性能是否会降低? –

+0

分布式计算的好理念有赖于分布式数据。为了您的任务的特定目的,您应该尝试提出更有效的解决方案。我仍不清楚你想达到什么目标。 – sau

+1

@svsteja也可以接受答案,如果这可以解决您的问题陈述。 – sau