2017-10-11 45 views

回答

1

没有办法检查PCollection的大小而不应用PTransform(例如Count.globally()或Combine.combineFn()),因为PCollection不像Java SDK中的典型集合等等。

这是一个有界或无界的数据集合的抽象,其中数据被送入集合中以应用于其上的操作(例如PTransform)。它也是并行的(正如课程开始时的P所示)。

因此,您需要一种机制来获取每个工作人员/节点的元素数量并将它们合并以获取值。直到变换结束,才能知道它是0还是n。

1

您没有指定您使用的是哪个SDK,因此我假设了Python。该代码很容易移植到Java。

您可以应用全局元素计数,然后通过应用简单比较将数值映射到布尔值。您将可以一边输入使用pvalue.AsSingleton功能这个值,像这样:

侧输入
import apache_beam as beam 
from apache_beam import pvalue 

is_empty_check = (your_pcollection 
        | "Count" >> beam.combiners.Count.Globally() 
        | "Is empty?" >> beam.Map(lambda n: n == 0) 
        ) 

another_pipeline_branch = (
    p 
    | beam.Map(do_something, is_empty=pvalue.AsSingleton(is_empty_check)) 
) 

用法如下:

def do_something(element, is_empty): 
    if is_empty: 
     # yes 
    else: 
     # no