2016-11-29 74 views
2

为什么我下面用pyspark写的计数器并不总是给我提供正确的结果,它与全球计数器有关吗?pyspark全球柜台

def increment_counter(): 
    global counter 
    counter += 1 

def get_number_of_element(rdd): 
    global counter 
    counter = 0 
    rdd.foreach(lambda x:increment_counter()) 
    return counter 

回答

4

您的全局变量只在驱动程序节点上定义,这意味着它将正常工作,直到您在本地主机上运行。 只要您将作业分配给多个进程,他们将无法访问变量counter,只需在其自己的进程中创建一个新的变量。所以最终的结果将只包含在驱动程序进程中完成的增量。

尽管如此,您正在寻找的是一个相当常见的用法,并且由Spark的累加器功能覆盖。在过程结束时分配和收集累加器,因此总计将包含所有节点的增量,而不是仅包含驱动程序节点。

Accumulators - Spark Programming Guide

+0

太好了!非常感谢! – xxx222