假设您有来自两个不同pubsub主题的Google云数据流信号,并且您想要将来自一个主题的信号与来自另一个主题的信号进行比较,并生成匹配项,重新平等。如果同时(或几乎同时)在一个主题中有多个相同的信号进入,那么我们不应该生成匹配。是否有可能以近乎实时的方式在Dataflow中生成匹配,以便让我们100%确定生成的匹配是否正确(即没有误报)?如果是这样,你将如何实现它?由于数据可能会延迟到Dataflow的时间窗口,所以我怀疑这很难。为了简化,如果我们从一个pubsub主题获得“A”,并从另一个主题获得“A”,我们应该生成一个匹配,但只有在没有任何其他A进入时话题几乎在同一时间(正负1秒)。比较Google云数据流中唯一匹配的数据集
0
A
回答
0
如果我正确理解你的问题,我会按照如下方式分解它。我将使用我们的标准四个问题,因为它在这种情况下很有帮助。
- “你在计算什么?” - 您正在进行流式加入,根据您的描述,我认为您会希望选择基于
CoGroupByKey
的加入。 - “Where in event time do you want to group your data?” - 如果事件发生在附近,您有兴趣一起观看活动。这大致对应于SDK中提供的
Sessions
。但是我不能告诉你是否希望会话通过两个流,或者每个流。希望我的回答能让你足够工作。 - “When do you want to produce output?” - 根据您的100%确定性目标,我认为您只有在决定放弃所有更多数据后才会生产产量。要真正具有100%的确定性,您必须知道数据来自哪里(这与数据流无关)。
- “如何改进先前的输出?” - 由于单个连接结果没有多个输出,所以这不是问题。
让我们假设你有这些输入:
PCollection<String> streamA
PCollection<String> streamB
,如果你想基于整个流同时发生做到这一点匹配+内重复数据删除,那么你可以简单窗户进入Sessions
,并做了CoGroupByKey
:
PCollection<KV<String, String>> windowedA = streamA
.apply(WithKeys.of(String v -> v))
.apply(Window.into(
Sessions.withGapDuration(Duration.standardSeconds(1))));
PCollection<KV<String, String>> windowedB = // ditto
// Set up join handles
TupleTag<String> tagA = new TupleTag<String>() {};
TupleTag<String> tagb = new TupleTag<String>() {};
KeyedPCollectionTuple joinInput =
KeyedPCollectionTuple
.of(tagA, windowedA)
.and(tagB, windowedB);
PCollection<String> result = joinInput
// Group streams together by shared key
.apply(CoGroupByKey.create())
// Eliminate all but 1-to-1 matches
.apply(Filter.by(
KV<String, CoGbkResult> joined ->
Iterables.size(joined.getValue().getAll(tagA)) == 1
&& Iterables.size(joined.getValue().getAll(tagB)) == 1))
// The key is all we care about
.apply(Keys.create());
集合result
包含近乎同时匹配的字符串,但在一秒内没有重复。你真正的用例可能需要一些调整。
如果您想单独重复删除两个流,这稍微复杂一些,但不要太多。您需要分别为每个流分配会话和GroupByKey
,然后根据想要如何显示连接输出来重新窗口化。
相关问题
- 1. 比较匹配和非匹配的两个数据集
- 2. 数据集中的数据比较
- 3. 比较数据集
- 4. 如何比较两栏中数据匹配(或不匹配)的数据?
- 5. PHP数据类型比较不匹配
- 6. 比较,匹配和组合数据列
- 7. 比较数据集的值
- 8. 比较数据集并返回最佳匹配
- 9. 使用Google云数据流从一个数据存储中读取数据并写入另一个数据流
- 10. 多表数据集比较列数据
- 11. 比较两个表格数据并显示不匹配和匹配数据
- 12. SQL Server数据集比较
- 13. 比较数据集使用
- 14. 比较两个数据集
- 15. 比较行计数和数据集中
- 16. 比较一个数据集与vb.net中的另一个数据集
- 17. 比较2表并保留唯一匹配的数据(基于列)并删除不匹配的
- 18. 比较2个数据集中的R
- 19. 比较SQL中的两个数据集
- 20. 比较R中的数据集
- 21. 比较Pig中的两个数据集
- 22. 比较php中的大数据集
- 23. 比较SSRS中的两个数据集
- 24. Google云数据流作业挂起
- 25. 分析Google云数据流作业
- 26. Google云端数据流管道设置
- 27. 模式匹配,多个数据集匹配百分比
- 28. R:为匹配的数据匹配包,输出数据集
- 29. Google云数据流管道中的异常云到Bigtable
- 30. SQL-比较列匹配集
非常好的答案。谢谢! –