我有以下类型的示例数据。使用DataFlow的总和平均聚合
s.n., time, user, time_span, user_level
1, 2016-01-04T1:26:13, Hari, 8, admin
2, 2016-01-04T11:6:13, Gita, 2, admin
3, 2016-01-04T11:26:13, Gita, 0, user
现在我需要找到average_time_span/user
,average_time_span/user_level
和total_time_span/user
。
我可以找到上面提到的每个值,但无法一次找到所有这些值。由于我是DataFlow的新手,请为我推荐适当的方法。
static class ExtractUserAndUserLevelFn extends DoFn<String, KV<String, Long>> {
@Override
public void processElement(ProcessContext c) {
String[] words = c.element().split(",");
if (words.length == 5) {
Instant timestamp = Instant.parse(words[1].trim());
KV<String, Long> userTime = KV.of(words[2].trim(), Long.valueOf(words[3].trim()));
KV<String, Long> userLevelTime = KV.of(words[4].trim(), Long.valueOf(words[3].trim()));
c.outputWithTimestamp(userTime, timestamp);
c.outputWithTimestamp(userLevelTime, timestamp);
}
}
}
public static void main(String[] args) {
TestOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(TestOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()))
.apply(ParDo.of(new ExtractUserAndUserLevelFn()))
.apply(Window.<KV<String, Long>>into(
FixedWindows.of(Duration.standardSeconds(options.getMyWindowSize()))))
.apply(GroupByKey.<String, Long>create())
.apply(ParDo.of(new DoFn<KV<String, Iterable<Long>>, KV<String, Long>>() {
public void processElement(ProcessContext c) {
String key = c.element().getKey();
Iterable<Long> docsWithThatUrl = c.element().getValue();
Long sum = 0L;
for (Long item : docsWithThatUrl)
sum += item;
KV<String, Long> userTime = KV.of(key, sum);
c.output(userTime);
}
}))
.apply(MapElements.via(new FormatAsTextFn()))
.apply(TextIO.Write.named("WriteCounts").to(options.getOutput()).
withNumShards(options.getShardsNumber()));
p.run();
}
但我需要找到不同列的平均值和不同列值的平均值。如何在单个程序中做到这一点。 – Lionel
你会想把它们分别处理成一个单独的PCollection,从原来的PCollection中分支出来。 –
我可以使用[sideOutput](https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/transforms/PDoDo)进行此操作。 – Lionel