2016-11-17 164 views
1

给定一个火花数据帧,其看起来像这样:保火花数据帧列分区

================================== 
| Name | Col1 | Col2 | .. | ColN | 
---------------------------------- 
| A | 1 | 11 | .. | 21 | 
| A | 31 | 41 | .. | 51 | 
| B | 2 | 12 | .. | 22 | 
| B | 32 | 42 | .. | 52 | 
================================== 

我想运行,其执行对对应于表中的一个分区的聚集/计算逻辑一个特定的值为Name。所述逻辑要求该分区的完整内容 - 并且该分区 - 在执行该逻辑的节点上的存储器中被物化;它看起来像下面的processSegment功能:

def processDataMatrix(dataMatrix): 
    # do some number crunching on a 2-D matrix 

def processSegment(dataIter): 
    # "running" value of the Name column in the iterator 
    dataName = None 
    # as the iterator is processed, put the data in a matrix 
    dataMatrix = [] 

    for dataTuple in dataIter: 
     # separate the name column from the other columns 
     (name, *values) = dataTuple 
     # SANITY CHECK: ensure that all rows have same name 
     if (dataName is None): 
      dataName = name 
     else: 
      assert (dataName == name), 'row name ' + str(name) + ' does not match expected ' + str(dataName) 

     # put the row in the matrix 
     dataMatrix.append(values) 

    # if any rows were processed, number-crunch the matrix 
    if (dataName is not None): 
     return processDataMatrix(dataMatrix) 
    else: 
     return [] 

我曾尝试通过基于Name列重新分区,然后在每个分区上运行processSegment,使这项工作通过mapPartitions底层RDD:

result = \ 
    stacksDF \ 
     .repartition('Name') \ 
     .rdd \ 
     .mapPartitions(processSegment) \ 
     .collect() 

然而,这一进程经常未能在processSegmentSANITY CHECK断言:

AssertionError: row name Q7 does not match expected A9 

当我试图在底层RDD上运行mapPartitions时,为什么在DataFrame上表面上执行的分区不会被保留?如果上述方法无效,是否有某种方法(使用DataFrame API或RDD API),这将使我能够对DataFrame分区的内存再现执行聚合逻辑?

(由于我使用PySpark,和特定的数字运算,逻辑我想要执行的是Python的,用户定义的聚合函数(UDAFs)would not appear to be an option。)

回答

1

我相信,你误会了如何划分工作。一般来说,partioner是一个满意的函数,而不是双射函数。尽管特定值的所有记录都将移至单个分区,但分区可能包含具有多个不同值的记录。

DataFrame API不给你在分区的任何控制,但也可以自定义partitionFunc定义使用RDD API时。这意味着你可以使用其中一个是双射,例如:

mapping = (df 
    .select("Name") 
    .distinct() 
    .rdd.flatMap(lambda x: x) 
    .zipWithIndex() 
    .collectAsMap()) 

def partitioner(x): 
    return mapping[x] 

和按如下方式使用它:

df.rdd.map(lambda row: (row.Name, row)).partitionBy(len(mapping), partitioner) 

虽然有可能你要记住,分区是不是免费的,如果数量独特的价值很大,它可能会成为一个严重的性能问题。