2015-08-17 42 views
1

我有这段代码,我正在使用pysparkipython中读取一个文件。我试图做的是添加一个片段,它基于从文件中读取的特定列形成列表,但是当我尝试执行它时,列表变为空,并且没有任何内容被附加到它。我的代码是:使用PySpark从地图创建全局列表的问题

list1 = [] 

def file_read(line): 

    list1.append(line[10]) 
    # bunch of other code which process other column indexes on `line` 

inputData = sc.textFile(fileName).zipWithIndex().filter(lambda (line,rownum): rownum>0).map(lambda (line, rownum): line) 

column_val = (inputData 
    .map(lambda line: line.split(",")) 
    .filter(lambda line: len(line) >1) 
    .map(file_read)) 

当我执行的代码,这部分list1还是对空,即使有,因为我在上面的相同功能的代码的其他部分使用它在line[10]的数据。看起来好像只是没有将它追加到列表中。我如何形成上面的列表?

+0

你的例子很不完整,只能推测。 list1在其他地方被清除了吗?您是否尝试过在append之前/之后添加打印语句,打印list1以及正在追加的内容? – barny

+0

@barny list1未被其他地方清除。我试图在'file_read()'函数内执行'print line [19]',但是当我调用它时什么都不会打印 –

回答

3

嗯,它实际上附加到list1,问题不在于你正在考虑的问题。序列化封闭中引用的每个变量并发送给工作人员。它也适用于list1

每个分区都会收到它自己的list1副本,当file_read被调用时,数据将被追加到该副本中,并且当给定的映射阶段结束时,它将超出范围并被丢弃。

不是特别优雅的代码,但你应该看到,这是真的这里发生了什么:

rdd = sc.parallelize(range(100), 5) 

line1 = [] 

def file_read(line): 
    list1.append(line) 
    print len(list1) 
    return line 

xs = rdd.map(file_read).collect() 

编辑

星火提供了两种类型的共享变量。 Broadcast variables(仅从工作人员透视图中读取)和accumulators(仅从驱动程序透视图中写入)。

默认情况下,累加器仅支持数字变量,并且主要用作计数器。虽然可以定义自定义累加器。要做到这一点,你必须扩展AccumulatorParam类,并提供定制zeroaddInPlace实现:

class ListParam(AccumulatorParam): 
    def zero(self, v): 
     return [] 
    def addInPlace(self, acc1, acc2): 
     acc1.extend(acc2) 
     return acc1 

接下来,您可以重新定义file_read如下:

def file_read1(line): 
    global list1 # Required otherwise the next line will fail 
    list1 += [line] 
    return line 

用法示例:

list1 = sc.accumulator([], ListParam()) 

rdd = sc.parallelize(range(10)).map(file_read1).collect() 
list1.value 

即使如果可以像这样使用蓄能器,那么在实践中使用它可能会很昂贵在最坏的情况下,它可能会导致驾驶员死亡。相反,你可以简单地使用另一个转变:

tmp = (inputData 
    .map(lambda line: line.split(",")) 
    .filter(lambda line: len(line) >1)) 

def line_read2(line): return ... # Just a core logic 

line1 = tmp.map(lambda line: line[10]) 
column_val = tmp.map(line_read2) 

旁注:您提供什么都不做

代码。 Spark中的转换只是描述了必须完成的工作,但在调用操作数据之前,没有任何操作是真正执行的。