2016-11-14 30 views
0

我有一个具有四个基本领域的一些数据聚合的各个领域:Pyspark聚集 - 以不同的方式

  1. 场1是一个用于数据的关键
  2. 字段2应该是一个集合中的所有唯一值此字段
  3. 字段3是最小值(时间戳)
  4. 字段4是最大值(时间戳)

原始代码看起来是这样的:

data = (
    dataframe 
    .rdd 
    # flatten rows 
    .flatMap(lambda x: x) 
    # Parse JSON 
    .flatMap(lambda x: encode_json(x)) 
    # Capture values 
    .map(lambda x: [ 
     # Merge 'field1', 'field2' --> 'field1, field2' 
     ','.join(_ for _ in [x.get('metadata_value'), x.get('field2')]), 
     # Create pairing of low and high timestamps 
     [x.get('low'), x.get('high')] 
    ]) 
    # Aggregate into a list of low/high timestamps per 'field1, field2' 
    .aggregateByKey(list(), lambda u, v: u + [v], lambda u1, u2: u1 + u2) 
    # Flatten keys 'ip,guid' --> 'ip', 'guid' 
    .map(lambda x: (x[0].split(',')[0], x[0].split(',')[1], x[1], sum(1 for _ in x[1]))) 
    # Reduce timestamps to single values: [s1, e1], [s2, e2], ... --> s_min, e_max 
    .map(lambda x: (x[0], x[1], min(_[0] for _ in x[2]), max(_[1] for _ in x[2]), x[3])) 
) 

原始输出如下:

a | x| 20160103 | 20160107 
a | x013579 | 20160101 | 20160106 

新的输出应该是这样的:

a | {x,x013579} | 20160101 | 20160107 

回答

1

添加这2个变换你的电流输出,映射到一对RDD,并通过相应的操作(词典,最小值,最大值)减少每个字段。

data.map(lambda reg: (reg[0],[reg[1],reg[2],reg[3]])) .reduceByKey(lambda v1,v2: ({v1[0],v2[0]},min(v1[1],v2[1]), max(v1[2],v2[2])))