2016-01-06 154 views
2

我有以下类型的示例数据。使用DataFlow的总和平均聚合

s.n., time, user, time_span, user_level 
1, 2016-01-04T1:26:13, Hari, 8, admin 
2, 2016-01-04T11:6:13, Gita, 2, admin 
3, 2016-01-04T11:26:13, Gita, 0, user 

现在我需要找到average_time_span/useraverage_time_span/user_leveltotal_time_span/user

我可以找到上面提到的每个值,但无法一次找到所有这些值。由于我是DataFlow的新手,请为我推荐适当的方法。

static class ExtractUserAndUserLevelFn extends DoFn<String, KV<String, Long>> { 
     @Override 
     public void processElement(ProcessContext c) { 

      String[] words = c.element().split(","); 

      if (words.length == 5) { 
       Instant timestamp = Instant.parse(words[1].trim());      
       KV<String, Long> userTime = KV.of(words[2].trim(), Long.valueOf(words[3].trim())); 
       KV<String, Long> userLevelTime = KV.of(words[4].trim(), Long.valueOf(words[3].trim()));      
       c.outputWithTimestamp(userTime, timestamp); 
       c.outputWithTimestamp(userLevelTime, timestamp); 

      } 
     } 
    } 


public static void main(String[] args) { 
    TestOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() 
      .as(TestOptions.class); 
    Pipeline p = Pipeline.create(options); 
    p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile())) 
      .apply(ParDo.of(new ExtractUserAndUserLevelFn())) 
      .apply(Window.<KV<String, Long>>into(
        FixedWindows.of(Duration.standardSeconds(options.getMyWindowSize())))) 
      .apply(GroupByKey.<String, Long>create()) 
      .apply(ParDo.of(new DoFn<KV<String, Iterable<Long>>, KV<String, Long>>() { 
       public void processElement(ProcessContext c) { 
        String key = c.element().getKey(); 
        Iterable<Long> docsWithThatUrl = c.element().getValue(); 
        Long sum = 0L; 
        for (Long item : docsWithThatUrl) 
         sum += item; 
        KV<String, Long> userTime = KV.of(key, sum); 
        c.output(userTime); 
       } 
      })) 
      .apply(MapElements.via(new FormatAsTextFn())) 
      .apply(TextIO.Write.named("WriteCounts").to(options.getOutput()). 
        withNumShards(options.getShardsNumber())); 

    p.run(); 
} 

回答

2

一种方法是将线第一解析成每行包含一个PCollection,并从该集合创建键值对两个PCollection。比方说,你定义表示这样一行一条记录:

static class Record implements Serializable { 
    final String user; 
    final String role; 
    final long duration; 
    // need a constructor here 
} 

现在,创建一个从输入线创建唱片LineToRecordFn,这样就可以做到:

PCollection<Record> records = p.apply(TextIO.Read.named("ReadLines") 
           .from(options.getInputFile())) 
           .apply(ParDo.of(new LineToRecordFn())); 

你可以窗口在这里,如果你想。不管你的窗口或没有,你可以创建你键入的通过角色和键控通过用户PCollections:

PCollection<KV<String,Long>> role_duration = records.apply(MapElements.via(
    new SimpleFunction<Record,KV<String,Long>>() { 
      @Override 
      public KV<String,Long> apply(Record r) { 
      return KV.of(r.role,r.duration); 
      } 
     })); 

PCollection<KV<String,Long>> user_duration = records.apply(MapElements.via(
    new SimpleFunction<Record,KV<String,Long>>() { 
       @Override 
       public KV<String,Long> apply(Record r) { 
       return KV.of(r.user, r.duration); 
       } 
      })); 

现在,你可以得到的手段和金额在短短的几行字:

PCollection<KV<String,Double>> mean_by_user = user_duration.apply(
    Mean.<String,Long>perKey()); 
PCollection<KV<String,Double>> mean_by_role = role_duration.apply(
    Mean.<String,Long>perKey()); 
PCollection<KV<String,Long>> sum_by_role = role_duration.apply(
    Sum.<String>longsPerKey()); 

请注意,数据流在运行作业之前会进行一些优化。所以,虽然它可能看起来像是在记录PCollection上做了两遍,但这可能并非如此。

1

MeanSum变换样子,他们会为这种使用情况运行良好。基本用法是这样的:

PCollection<KV<String, Double>> meanPerKey = 
    input.apply(Mean.<String, Integer>perKey()); 

PCollection<KV<String, Integer>> sumPerKey = input 
    .apply(Sum.<String>integersPerKey()); 
+0

但我需要找到不同列的平均值和不同列值的平均值。如何在单个程序中做到这一点。 – Lionel

+1

你会想把它们分别处理成一个单独的PCollection,从原来的PCollection中分支出来。 –

+0

我可以使用[sideOutput](https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/transforms/PDoDo)进行此操作。 – Lionel