1

我正在尝试编写一个数据存储流,该数据存储流读取pub sub的流并写入大查询。从Cloud Dataflow写入BigQuery:无法从输入创建侧输入视图

当试图运行该工具,我得到错误“无法创建从输入端输入视图”与堆栈跟踪:

Exception in thread "main" java.lang.IllegalStateException: Unable to create a side-input view from input 
at com.google.cloud.dataflow.sdk.transforms.View$AsIterable.validate(View.java:277) 
at com.google.cloud.dataflow.sdk.transforms.View$AsIterable.validate(View.java:268) 
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:366) 
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274) 
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:161) 
at com.google.cloud.dataflow.sdk.io.Write$Bound.createWrite(Write.java:214) 
at com.google.cloud.dataflow.sdk.io.Write$Bound.apply(Write.java:79) 
at com.google.cloud.dataflow.sdk.io.Write$Bound.apply(Write.java:68) 
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74) 
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.apply(DirectPipelineRunner.java:247) 
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:367) 
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:290) 
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:174) 
at com.google.cloud.dataflow.sdk.io.BigQueryIO$Write$Bound.apply(BigQueryIO.java:1738) 
at com.google.cloud.dataflow.sdk.io.BigQueryIO$Write$Bound.apply(BigQueryIO.java:1440) 
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74) 
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.apply(DirectPipelineRunner.java:247) 
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:367) 
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274) 
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:161) 
at co.uk.bubblestudent.dataflow.StarterPipeline.main(StarterPipeline.java:116) 
Caused by: java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey. 
at com.google.cloud.dataflow.sdk.transforms.GroupByKey.applicableTo(GroupByKey.java:192) 
at com.google.cloud.dataflow.sdk.transforms.View$AsIterable.validate(View.java:275) 
... 20 more 

我的代码是:

public class StarterPipeline { 


public static final Duration ONE_DAY = Duration.standardDays(1); 
public static final Duration ONE_HOUR = Duration.standardHours(1); 
public static final Duration TEN_SECONDS = Duration.standardSeconds(10); 
private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class); 

    private static TableSchema schemaGen() { 
    List<TableFieldSchema> fields = new ArrayList<>(); 
    fields.add(new TableFieldSchema().setName("facebookID").setType("STRING")); 
    fields.add(new TableFieldSchema().setName("propertyID").setType("STRING")); 
    fields.add(new TableFieldSchema().setName("time").setType("TIMESTAMP")); 
    TableSchema schema = new TableSchema().setFields(fields); 
    return schema; 
    } 

    public static void main(String[] args) { 
    LOG.info("Starting"); 
    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); 
    LOG.info("Pipeline made"); 
    // For Cloud execution, set the Cloud Platform project, staging location, 
    // and specify DataflowPipelineRunner or BlockingDataflowPipelineRunner. 
    options.setProject(<project>); 
    options.setStagingLocation(<bucket>); 
    options.setTempLocation(<bucket>); 
    Pipeline p = Pipeline.create(options); 


    TableSchema schema = schemaGen(); 
    LOG.info("Schema made"); 
    try { 
    LOG.info(schema.toPrettyString()); 
} catch (IOException e) { 
    // TODO Auto-generated catch block 
    e.printStackTrace(); 
} 
    PCollection<String> input = p.apply(PubsubIO.Read.named("ReadFromPubsub").subscription(<subscription>)); 

    PCollection<TableRow> pardo = input.apply(ParDo.of(new FormatAsTableRowFn())); 
    LOG.info("Formatted Row"); 

    pardo.apply(BigQueryIO.Write.named("Write into BigQuery").to(<table>) 
     .withSchema(schema) 
     .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) 
     .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); 
    LOG.info("about to run"); 
    p.run(); 

    } 


    static class FormatAsTableRowFn extends DoFn<String, TableRow> { 
    @Override 
    public void processElement(ProcessContext c) { 
     LOG.info("Formatting"); 
     String json = c.element(); 

     //HashMap<String,String> items = new Gson().fromJson(json, new TypeToken<HashMap<String, String>>(){}.getType()); 

     // Make a BigQuery row from the JSON object: 
     TableRow row = new TableRow() 
      .set("facebookID","324234") 
      .set("properttyID", "23423") 
      .set("time", "12312313123"); 


     /* 
     *  TableRow row = new TableRow() 
      .set("facebookID", items.get("facbookID")) 
      .set("properttyID", items.get("propertyID")) 
      .set("time", items.get("time")); 
     */ 
     c.output(row); 
    } 
    } 
} 

任何关于这可能是什么的建议?

+0

您使用的是什么版本的Dataflow SDK? – danielm

+0

1.1.2 for eclipse –

+0

我不相信有1.1.2版本。数据流现在最高可达1.6.0;你可以试试吗? – danielm

回答

1

BigQueryIO的默认实现只对有界的PCollections有效,而PubsubIO.Read产生无限的PCollection。

有两种方法可以解决这个问题:可以通过在您的PubsubIO转换中调用maxReadTime或maxNumElements来绑定输入,或者您可以通过在选项上调用setStreaming(true)来使用BigQueryIO的流式插入类型。

+0

好的,谢谢你的回答!我明天早上在办公室检查一下。 –

+0

这工作,我的项目开始运行,但它开始输出'八月04,2016 9:47:55 AM com.google.api.client.http.HttpRequest执行 警告:执行请求时抛出的异常 java.net .SocketTimeoutException:读取超时'每隔20秒左右。我看了这个,好像你必须编辑套接字超时时间?但是,一些简单的示例不这样做,我认为必须工作 - https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/examples/src/main/java/com/google/cloud/dataflow/examples/complete /TrafficRoutes.java –

+0

您能否提供更多关于异常来自何处的上下文?一般来说,你不应该耽搁超时,尽管如果你在本地运行,你可能会使用较慢或较高延迟的网络。 – danielm