2016-10-11 59 views
0

假设您有来自两个不同pubsub主题的Google云数据流信号,并且您想要将来自一个主题的信号与来自另一个主题的信号进行比较,并生成匹配项,重新平等。如果同时(或几乎同时)在一个主题中有多个相同的信号进入,那么我们不应该生成匹配。是否有可能以近乎实时的方式在Dataflow中生成匹配,以便让我们100%确定生成的匹配是否正确(即没有误报)?如果是这样,你将如何实现它?由于数据可能会延迟到Dataflow的时间窗口,所以我怀疑这很难。为了简化,如果我们从一个pubsub主题获得“A”,并从另一个主题获得“A”,我们应该生成一个匹配,但只有在没有任何其他A进入时话题几乎在同一时间(正负1秒)。比较Google云数据流中唯一匹配的数据集

回答

0

如果我正确理解你的问题,我会按照如下方式分解它。我将使用我们的标准四个问题,因为它在这种情况下很有帮助。

  • 你在计算什么?” - 您正在进行流式加入,根据您的描述,我认为您会希望选择基于CoGroupByKey的加入。
  • Where in event time do you want to group your data?” - 如果事件发生在附近,您有兴趣一起观看活动。这大致对应于SDK中提供的Sessions。但是我不能告诉你是否希望会话通过两个流,或者每个流。希望我的回答能让你足够工作。
  • When do you want to produce output?” - 根据您的100%确定性目标,我认为您只有在决定放弃所有更多数据后才会生产产量。要真正具有100%的确定性,您必须知道数据来自哪里(这与数据流无关)。
  • 如何改进先前的输出?” - 由于单个连接结果没有多个输出,所以这不是问题。

让我们假设你有这些输入:

PCollection<String> streamA 
PCollection<String> streamB 

,如果你想基于整个流同时发生做到这一点匹配+内重复数据删除,那么你可以简单窗户进入Sessions,并做了CoGroupByKey

PCollection<KV<String, String>> windowedA = streamA 
    .apply(WithKeys.of(String v -> v)) 
    .apply(Window.into(
     Sessions.withGapDuration(Duration.standardSeconds(1)))); 

PCollection<KV<String, String>> windowedB = // ditto 

// Set up join handles 
TupleTag<String> tagA = new TupleTag<String>() {}; 
TupleTag<String> tagb = new TupleTag<String>() {}; 

KeyedPCollectionTuple joinInput = 
    KeyedPCollectionTuple 
     .of(tagA, windowedA) 
     .and(tagB, windowedB); 

PCollection<String> result = joinInput 

    // Group streams together by shared key 
    .apply(CoGroupByKey.create()) 

    // Eliminate all but 1-to-1 matches 
    .apply(Filter.by(
     KV<String, CoGbkResult> joined -> 
      Iterables.size(joined.getValue().getAll(tagA)) == 1 
      && Iterables.size(joined.getValue().getAll(tagB)) == 1)) 

    // The key is all we care about 
    .apply(Keys.create()); 

集合result包含近乎同时匹配的字符串,但在一秒内没有重复。你真正的用例可能需要一些调整。

如果您想单独重复删除两个流,这稍微复杂一些,但不要太多。您需要分别为每个流分配会话和GroupByKey,然后根据想要如何显示连接输出来重新窗口化。

+0

非常好的答案。谢谢! –