我花了相当长的一段时间才弄清楚,虽然答案很简单,所以我想我会在这里发布我的解决方案。
首先由key
(客户ID)减少所有的交易:
from operators import add
# ddf is a dataframe with a transaction in each row. Key is the column
# we want to group the transactions by.
txnrdd = ddf.rdd.map(lambda row: (row['key'], [row],)).reduceByKey(add)
这给出了一个rdd
看起来像(key, [list of Rows])
。要将其写回dataframe
,您需要构建模式。交易清单可以模拟ArrayType
。
from pyspark.sql import types as sqxt
txn_schema = sqxt.StructType([
sqxt.StructField('Key', sqxt.StringType()),
sqxt.StructField('Transactions', sqxt.ArrayType(ddf.schema))
])
然后,它的直接的将数据写入到磁盘中的这种结构:
txnddf = txnrdd.toDF(schema=txn_schema)
txnddf.write.parquet('customer-transactions.parquet')
表现似乎确定。如果不通过RDD,找不到方法。