2017-07-03 34 views
0

按照Apache Beam 2.0.0 SDK DocumentationGroupIntoBatches只能与KV集合。GroupIntoBatches非KV元素

我的数据集只包含值,不需要引入密钥。然而,要利用GroupIntoBatches我不得不实施一个空字符串作为关键的“假”键:

static class FakeKVFn extends DoFn<String, KV<String, String>> { 
    @ProcessElement 
    public void processElement(ProcessContext c) { 
    c.output(KV.of("", c.element())); 
    } 
} 

所以整体管道如下所示:

public static void main(String[] args) { 
    PipelineOptions options = PipelineOptionsFactory.create(); 
    Pipeline p = Pipeline.create(options); 

    long batchSize = 100L; 

    p.apply("ReadLines", TextIO.read().from("./input.txt")) 
     .apply("FakeKV", ParDo.of(new FakeKVFn())) 
     .apply(GroupIntoBatches.<String, String>ofSize(batchSize)) 
     .setCoder(KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(StringUtf8Coder.of()))) 
     .apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, String>() { 
     @ProcessElement 
     public void processElement(ProcessContext c) { 
      c.output(callWebService(c.element().getValue())); 
     } 
     })) 
     .apply("WriteResults", TextIO.write().to("./output/")); 

    p.run().waitUntilFinish(); 
} 

有什么办法在不引入“假”键的情况下分批分组?

回答

2

需要将KV输入提供给GroupIntoBatches,因为转换是使用状态和定时器(每个按键和窗口)实现的。

对于每个键+窗口对,状态和计时器必须连续执行(或可观察地执行)。你必须通过提供密钥(和窗口,尽管没有我知道的在Windows上并行运行的runner)来手动表示可用的并行性。两种最常见的方法是:

  1. 使用如用户ID
  2. 一些自然键选择碎片和重点的一些固定数量的随机。这可能很难调整。你必须有足够的分片才能获得足够的并行性,但每个分片都需要包含足够的数据,GroupIntoBatches实际上是有用的。

将一个虚拟键添加到所有元素中,就像在片段中一样,将导致转换不会并行执行。这与Stateful indexing causes ParDo to be run single-threaded on Dataflow Runner的讨论类似。