2016-05-14 70 views
0

我想将存储在pyspark.sql.dataframe.DataFrameddf”中的事务按指示事务源(在本例中为客户ID)的列“key”进行分组。将Python Spark组事务转换为嵌套模式

分组是一个相当昂贵的过程,所以我想写组磁盘嵌套模式:

(key, [[c1, c2, c3,...], ...]) 

这将让我迅速地加载上的一个键的所有交易,并开发复杂自定义聚合器,而无需重新运行分组。

如何创建嵌套模式并将其写入磁盘?

回答

0

我花了相当长的一段时间才弄清楚,虽然答案很简单,所以我想我会在这里发布我的解决方案。

首先由key(客户ID)减少所有的交易:

from operators import add 
# ddf is a dataframe with a transaction in each row. Key is the column 
# we want to group the transactions by. 

txnrdd = ddf.rdd.map(lambda row: (row['key'], [row],)).reduceByKey(add) 

这给出了一个rdd看起来像(key, [list of Rows])。要将其写回dataframe,您需要构建模式。交易清单可以模拟ArrayType

from pyspark.sql import types as sqxt 
txn_schema = sqxt.StructType([ 
    sqxt.StructField('Key', sqxt.StringType()), 
    sqxt.StructField('Transactions', sqxt.ArrayType(ddf.schema)) 
]) 

然后,它的直接的将数据写入到磁盘中的这种结构:

txnddf = txnrdd.toDF(schema=txn_schema) 
txnddf.write.parquet('customer-transactions.parquet') 

表现似乎确定。如果不通过RDD,找不到方法。