2017-06-19 25 views
0

我将WindowsWordCount example程序的源程序从文本文件更改为云Pub/Sub,如下所示。我将莎士比亚文件的数据发布到Pub/Sub中,但是它没有正确提取,但.groupByKey似乎没有任何转换。Scio:使用Pub/Sub作为收集源时,groupByKey不起作用

sc.pubsubSubscription[String](psSubscription) 
    .withFixedWindows(windowSize) // apply windowing logic 
    .flatMap(_.split("[^a-zA-Z']+").filter(_.nonEmpty)) 
    .countByValue 
    .withWindow[IntervalWindow] 
    .swap 
    .groupByKey 
    .map { 
    s => 
     println("\n\n\n\n\n\n\n This never prints \n\n\n\n\n") 
     println(s) 
    } 

回答

2

将输入从文本文件更改为PubSub PCollection“unbounded”。按键对其进行分组需要定义聚合触发器,否则分组器将永远等待。它是这里的数据流文件中提到: https://cloud.google.com/dataflow/model/group-by-key

注:要么非全局窗口化或聚合触发以无界PCollection执行GroupByKey是必需的。这是因为有界的GroupByKey必须等待收集具有特定密钥的所有数据;但无限收集,数据是无限的。窗口化和/或触发器允许分组在无界数据流内的逻辑,有限数据集上操作。

如果将GroupByKey应用于无界PCollection而未设置非全局窗口策略或触发器策略或两者,则Dataflow将在构建管道时生成IllegalStateException错误。

不幸的是,在Apache Beam的Python SDK似乎不支持触发器(还),所以我不知道python中的解决方案是什么。

(见https://beam.apache.org/documentation/programming-guide/#triggers

1

根据以上(我会回答他的评论特别是如果StackOverflow上会让我!)我看到文档说触发不落实关于弗朗茨的评论......但他们也说实时数据库功能不可用,而我们目前的项目正在积极使用它们。他们只是新的。

见触发功能在这里:https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/trigger.py

当心,API是未完成的,因为这不是“发布就绪”代码。但它是可用的。

+1

看起来你可以把它写成一个答案,因为它代表着它的评论,因为你提到你会喜欢它。将来不要对问题做出评论回答,违反规则,如果你有答案,将它作为一个整体发布,如果你不这样做,不要作为回答发布。这篇文章的措辞使其成为删除的候选人。 – snb