2017-09-22 64 views
0

我有一个很烦人的文件集结构,像这样:星火/蜂巢 - 组数据为“枢轴表”格式

userId string, 
eventType string, 
source string, 
errorCode string, 
startDate timestamp, 
endDate timestamp 

每个文件可能包含的每EVENTID记录的任意数,用不同eventTypes和来源,以及不同的代码和每个的开始/结束日期。

是否有蜂巢的方式或火花组所有这些一起userId的,有点像一个key-value,其中值是与用户id相关联的所有字段列表?具体来说,我希望它通过eventType和源键入。基本上我想交易宽度的表格长度,有点像数据透视表。我的目标是最终将存储为Apache Parquet或Avro文件格式,以便将来进行更快速的分析。

下面是一个例子:

源数据:

userId, eventType, source, errorCode, startDate, endDate 
552113, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452' 
284723, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775' 
552113, 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229' 
552113, 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976' 
284723, 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947' 
552113, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623' 
284723, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777' 

目标:

userId, eventTypeAckProvider, sourceAckProvider, errorCodeAckProvider, startDateAckProvider, endDateAckProvider, eventTypeTradeMerch, sourceTradeMerch, errorCodeTradeMerch, startDateTradeMerch, endDateTradeMerch, eventTypeChargeMerch, sourceChargeMerch, errorCodeChargeMerch, startDateChargeMerch, endDateChargeMerch, eventTypeCloseProvider, sourceCloseProvider, errorCodeCloseProvider, startDateCloseProvider, endDateCloseProvider, eventTypeRefundMerch, sourceRefundMerch, errorCodeRefundMerch, startDateRefundMerch, endDateRefundMerch 
552113, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452', 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229', 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976', 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623', NULL, NULL, NULL, NULL, NULL 
284723, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777', 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947' 

字段名称或顺序并不重要,只要我能区分它们。

我已经试过两种方法已经得到这个工作:

  1. 手动从表中选择的每个组合,并加入到主数据集。这很好,并行性好,但不允许关键字段的任意数量的值,并且需要预定义模式。
  2. 使用Spark创建一个key:value记录的字典,其中每个值都是一个字典。基本上遍历数据集,如果字典不存在,则向字典添加一个新的键;如果该字段不存在,则为该条目添加一个新的字段到值字典中。这种方法非常精美,但速度非常慢,如果完全平行的话也不会很好地并行化。另外我不确定这是否是Avro/Parquet兼容格式。

是否有任何替代这两个方法?甚至比我的目标更好的结构?

回答

1

你想有这样的事情?

from pyspark.sql.functions import struct, col, create_map, collect_list 

df = sc.parallelize([ 
    ['552113', 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452'], 
    ['284723', 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775'], 
    ['552113', 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229'], 
    ['552113', 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976'], 
    ['284723', 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947'], 
    ['552113', 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623'], 
    ['284723', 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777'] 
]).toDF(('userId', 'eventType', 'source', 'errorCode', 'startDate', 'endDate')) 
df.show() 

new_df = df.withColumn("eventType_source", struct([col('eventType'), col('source')])).\ 
    withColumn("errorCode_startEndDate", struct([col('errorCode'), col('startDate'), col('endDate')])) 

new_df = new_df.groupBy('userId').agg(collect_list(create_map(col('eventType_source'), col('errorCode_startEndDate'))).alias('event_detail')) 
new_df.show() 
+0

谢谢!这似乎可以工作!我在我的实时数据集上尝试过它,就它的分组方式而言,它几乎返回了我想要的结果。尽管如此,我并不熟悉“地图列表”数据结构,并且在操作的任何地方都找不到任何记录。我想后续的问题是,我如何与这个数据结构交互?作为一个例子,我如何获得特定用户的CHARGE/MERCH属性? –

+1

很高兴它帮助!我认为这可能有助于开始跟进问题:'从itertools导入链; new_df.printSchema(); 。 RDD1集= new_df.where(COL( '用户id')== '552113')选择( 'event_detail')rdd.flatMap(拉姆达X:链(*(X)))。 keys = rdd1.map(lambda x:x.keys())。collect(); values = rdd1.map(lambda x:x.values())。collect();''keys'和'values'是需要研究的。 – Prem

0

你可以试试这个,给你的意见,

>>> from pyspark.sql import SparkSession 
>>> from pyspark.sql import functions as F 
>>> from pyspark.sql.types import * 

>>> spark = SparkSession.builder.getOrCreate() 

>>> l=[(552113, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.432', '2017-09-01 12:01:45.452'),(284723, 'ACK', 'PROVIDER', 0, '2017-09-01 12:01:45.675', '2017-09-01 12:01:45.775'),(552113, 'TRADE', 'MERCH', 0, '2017-09-01 12:01:47.221', '2017-09-01 12:01:46.229'),(552113, 'CHARGE', 'MERCH', 0, '2017-09-01 12:01:48.123', '2017-09-01 12:01:48.976'),(284723, 'REFUND', 'MERCH', 1, '2017-09-01 12:01:48.275', '2017-09-01 12:01:48.947'),(552113, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:49.908', '2017-09-01 12:01:50.623'),(284723, 'CLOSE', 'PROVIDER', 0, '2017-09-01 12:01:50.112', '2017-09-01 12:01:50.777')] 

>>> df = spark.createDataFrame(l,['userId', 'eventType', 'source', 'errorCode', 'startDate','endDate']) 
>>> df.show(10,False) 
+------+---------+--------+---------+-----------------------+-----------------------+ 
|userId|eventType|source |errorCode|startDate    |endDate    | 
+------+---------+--------+---------+-----------------------+-----------------------+ 
|552113|ACK  |PROVIDER|0  |2017-09-01 12:01:45.432|2017-09-01 12:01:45.452| 
|284723|ACK  |PROVIDER|0  |2017-09-01 12:01:45.675|2017-09-01 12:01:45.775| 
|552113|TRADE |MERCH |0  |2017-09-01 12:01:47.221|2017-09-01 12:01:46.229| 
|552113|CHARGE |MERCH |0  |2017-09-01 12:01:48.123|2017-09-01 12:01:48.976| 
|284723|REFUND |MERCH |1  |2017-09-01 12:01:48.275|2017-09-01 12:01:48.947| 
|552113|CLOSE |PROVIDER|0  |2017-09-01 12:01:49.908|2017-09-01 12:01:50.623| 
|284723|CLOSE |PROVIDER|0  |2017-09-01 12:01:50.112|2017-09-01 12:01:50.777| 
+------+---------+--------+---------+-----------------------+-----------------------+ 

>>> myudf = F.udf(lambda *cols : cols,ArrayType(StringType())) #composition to create rowwise list 
>>> df1 = df.select('userId',myudf('eventType', 'source', 'errorCode','startDate', 'endDate').alias('val_list')) 

>>> df2 = df1.groupby('userId').agg(F.collect_list('val_list')) # grouped on userId 

>>> eventtypes = ['ACK','TRADE','CHARGE','CLOSE','REFUND'] # eventtypes and the order required in output 

>>> def f(Vals): 
     aggVals = [typ for x in eventtypes for typ in Vals if typ[0] == x] # to order the grouped data based on eventtypes above 
     if len(aggVals) == 5: 
      return aggVals 
     else: 
      missngval = [(idx,val) for idx,val in enumerate(eventtypes)if val not in zip(*aggVals)[0]] # get missing eventtypes with their index to create null 
      for idx,val in missngval: 
       aggVals.insert(idx,[None]*5) 
      return aggVals 

>>> myudf2 = F.udf(f,ArrayType(ArrayType(StringType()))) 
>>> df3 = df2.select('userId',myudf2('agg_list').alias('values')) 

>>> df4 = df3.select(['userId']+[df3['values'][i][x] for i in range(5) for x in range(5)]) # to select from Array[Array] 

>>> oldnames = df4.columns 
>>> destnames = ['userId', 'eventTypeAckProvider', 'sourceAckProvider', 'errorCodeAckProvider', 'startDateAckProvider', 'endDateAckProvider', 'eventTypeTradeMerch', 'sourceTradeMerch', 'errorCodeTradeMerch', 'startDateTradeMerch', 'endDateTradeMerch', 'eventTypeChargeMerch', 'sourceChargeMerch', 'errorCodeChargeMerch', 'startDateChargeMerch', 'endDateChargeMerch', 'eventTypeCloseProvider', 'sourceCloseProvider', 'errorCodeCloseProvider', 'startDateCloseProvider', 'endDateCloseProvider', 'eventTypeRefundMerch', 'sourceRefundMerch', 'errorCodeRefundMerch', 'startDateRefundMerch', 'endDateRefundMerch'] 

>>> finalDF = reduce(lambda d,idx : d.withColumnRenamed(oldnames[idx],destnames[idx]),range(len(oldnames)),df4) # Renaming the columns 
>>> finalDF.show()  

|userId|eventTypeAckProvider|sourceAckProvider|errorCodeAckProvider|startDateAckProvider |endDateAckProvider  |eventTypeTradeMerch|sourceTradeMerch|errorCodeTradeMerch|startDateTradeMerch |endDateTradeMerch  |eventTypeChargeMerch|sourceChargeMerch|errorCodeChargeMerch|startDateChargeMerch |endDateChargeMerch  |eventTypeCloseProvider|sourceCloseProvider|errorCodeCloseProvider|startDateCloseProvider |endDateCloseProvider |eventTypeRefundMerch|sourceRefundMerch|errorCodeRefundMerch|startDateRefundMerch |endDateRefundMerch  | 

|284723|ACK     |PROVIDER   |0     |2017-09-01 12:01:45.675|2017-09-01 12:01:45.775|null    |null   |null    |null     |null     |null    |null    |null    |null     |null     |CLOSE     |PROVIDER   |0      |2017-09-01 12:01:50.112|2017-09-01 12:01:50.777|REFUND    |MERCH   |1     |2017-09-01 12:01:48.275|2017-09-01 12:01:48.947| 
|552113|ACK     |PROVIDER   |0     |2017-09-01 12:01:45.432|2017-09-01 12:01:45.452|TRADE    |MERCH   |0     |2017-09-01 12:01:47.221|2017-09-01 12:01:46.229|CHARGE    |MERCH   |0     |2017-09-01 12:01:48.123|2017-09-01 12:01:48.976|CLOSE     |PROVIDER   |0      |2017-09-01 12:01:49.908|2017-09-01 12:01:50.623|null    |null    |null    |null     |null     | 
