2016-08-20 117 views
5

zipwithindex的equivelant假设我有以下数据框:星火:在数据帧

dummy_data = [('a',1),('b',25),('c',3),('d',8),('e',1)] 
df = sc.parallelize(dummy_data).toDF(['letter','number']) 

,我想创建以下数据框:

[('a',0),('b',2),('c',1),('d',3),('e',0)] 

我要做的就是将其转换为rdd并使用zipWithIndex功能并加入结果后:

convertDF = (df.select('number') 
       .distinct() 
       .rdd 
       .zipWithIndex() 
       .map(lambda x:(x[0].number,x[1])) 
       .toDF(['old','new'])) 


finalDF = (df 
      .join(convertDF,df.number == convertDF.old) 
      .select(df.letter,convertDF.new)) 

是否在数据框中有与zipWIthIndex类似的功能?是否有另一种更有效的方法来完成这项任务?

+2

http://stackoverflow.com/q/32760888/1560062 – zero323

回答

0

请检查https://issues.apache.org/jira/browse/SPARK-23074在数据框中的这种直接功能奇偶性。upvote jira如果您有兴趣在Spark的某个时间点看到这一点。

这里有一个解决办法,虽然在PySpark:

def dfZipWithIndex (df, offset=1, colName="rowId"): 
    ''' 
     Enumerates dataframe rows is native order, like rdd.ZipWithIndex(), but on a dataframe 
     and preserves a schema 

     :param df: source dataframe 
     :param offset: adjustment to zipWithIndex()'s index 
     :param colName: name of the index column 
    ''' 

    new_schema = StructType(
        [StructField(colName,LongType(),True)]  # new added field in front 
        + df.schema.fields       # previous schema 
       ) 

    zipped_rdd = df.rdd.zipWithIndex() 

    new_rdd = zipped_rdd.map(lambda (row,rowId): ([rowId +offset] + list(row))) 

    return spark.createDataFrame(new_rdd, new_schema) 

这也可以在abalon包。