2017-06-05 38 views
0

好的,大家。来自Dataflow新手的另一个数据流问题。 (刚刚开始玩这个星期..)Google云数据流写入数据问题(TextIO或DatastoreIO)

我创建一个数据管道,以获取产品名称列表并生成自动完成数据。数据处理部分看起来工作正常,但我错过了一些显而易见的事情,因为当我添加最后一个“.apply”来使用DatastoreIO或TextIO来写数据时,我在我的语法错误IDE,上面写着以下内容:

“的方法适用(DatastoreV1.Write)是未定义的类型ParDo.SingleOutput>,实体>”

如果给我一个选项添加铸造的方法接收器,但这显然不是答案。在我尝试写出数据之前,是否需要执行其他一些步骤?在尝试写入数据之前,我最后一步是调用Dataflow的实体助手来将我的管道结构从>更改为,这对我来说似乎需要写入Datastore。

过去几天我对这件事感到非常沮丧,我甚至决定将数据写入一些AVRO文件,所以我可以直接将它加载到数据存储区中。想象一下,当我完成所有这些工作时,我是如何勾选的,并在与TextIO呼叫完全相同的地方得到完全相同的错误。这就是为什么我认为我必须在这里失去一些非常明显的东西。

这是我的代码。我把它全部列入参考,但你可能只需要看看底部的main []。任何投入将不胜感激!谢谢!

MrSimmonsSr

package com.client.autocomplete; 

import com.client.autocomplete.AutocompleteOptions; 


import com.google.datastore.v1.Entity; 
import com.google.datastore.v1.Key; 
import com.google.datastore.v1.Value; 

import static com.google.datastore.v1.client.DatastoreHelper.makeKey; 
import static com.google.datastore.v1.client.DatastoreHelper.makeValue; 
import org.apache.beam.sdk.coders.DefaultCoder; 

import org.apache.beam.sdk.Pipeline; 
import org.apache.beam.sdk.PipelineResult; 
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; 
import org.apache.beam.sdk.values.PCollection; 
import org.apache.beam.sdk.values.PCollectionList; 
import com.google.api.services.bigquery.model.TableRow; 
import com.google.common.base.MoreObjects; 
import org.apache.beam.sdk.io.TextIO; 
import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO; 
import org.apache.beam.sdk.options.PipelineOptionsFactory; 
import org.apache.beam.sdk.transforms.PTransform; 
import org.apache.beam.sdk.transforms.Create; 
import org.apache.beam.sdk.transforms.DoFn; 
import org.apache.beam.sdk.transforms.MapElements; 
import org.apache.beam.sdk.transforms.ParDo; 
import org.apache.beam.sdk.transforms.SimpleFunction; 
import org.apache.beam.sdk.transforms.GroupByKey; 
import org.apache.beam.sdk.transforms.DoFn.ProcessContext; 
import org.apache.beam.sdk.transforms.DoFn.ProcessElement; 
import org.apache.beam.sdk.extensions.jackson.ParseJsons; 
import org.apache.beam.sdk.values.KV; 
import org.apache.beam.sdk.options.Default; 
import org.apache.beam.sdk.options.Description; 
import org.apache.beam.sdk.options.PipelineOptionsFactory; 
import org.apache.beam.sdk.options.StreamingOptions; 
import org.apache.beam.sdk.options.Validation; 
import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import java.io.IOException; 
import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 
import java.util.List; 
import java.util.ArrayList; 

/* 
* A simple Dataflow pipeline to create autocomplete data from a list of 
* product names. It then loads that prefix data into Google Cloud Datastore for consumption by 
* a Google Cloud Function. That function will take in a prefix and return a list of 10 product names 
* 
* Pseudo Code Steps 
* 1. Load a list of product names from Cloud Storage 
* 2. Generate prefixes for use with autocomplete, based on the product names 
* 3. Merge the prefix data together with 10 products per prefix 
* 4. Write that prefix data to the Cloud Datastore as a KV with a <String>, List<String> structure 
* 
*/ 

public class ClientAutocompletePipeline { 
    private static final Logger LOG = LoggerFactory.getLogger(ClientAutocompletePipeline.class); 


    /** 
    * A DoFn that keys each product name by all of its prefixes. 
    * This creates one row in the PCollection for each prefix<->product_name pair 
    */ 
    private static class AllPrefixes 
    extends DoFn<String, KV<String, String>> { 
     private final int minPrefix; 
     private final int maxPrefix; 

     public AllPrefixes(int minPrefix) { 
      this(minPrefix, 10); 
     } 

     public AllPrefixes(int minPrefix, int maxPrefix) { 
      this.minPrefix = minPrefix; 
      this.maxPrefix = maxPrefix; 
     } 
     @ProcessElement 
     public void processElement(ProcessContext c) { 
      String productName= c.element().toString(); 
      for (int i = minPrefix; i <= Math.min(productName.length(), maxPrefix); i++) { 
       c.output(KV.of(productName.substring(0, i), c.element())); 
      } 
     } 
    } 

    /** 
    * Takes as input the top product names per prefix, and emits an entity 
    * suitable for writing to Cloud Datastore. 
    * 
    */ 
    static class FormatForDatastore extends DoFn<KV<String, List<String>>, Entity> { 
     private String kind; 
     private String ancestorKey; 

     public FormatForDatastore(String kind, String ancestorKey) { 
      this.kind = kind; 
      this.ancestorKey = ancestorKey; 
     } 

     @ProcessElement 
     public void processElement(ProcessContext c) { 
      // Initialize an EntityBuilder and get it a valid key 
      Entity.Builder entityBuilder = Entity.newBuilder(); 
      Key key = makeKey(kind, ancestorKey).build(); 
      entityBuilder.setKey(key); 

      // New HashMap to hold all the properties of the Entity 
      Map<String, Value> properties = new HashMap<>(); 
      String prefix = c.element().getKey(); 
      String productsString = "Products["; 

      // iterate through the product names and add each one to the productsString 
      for (String productName : c.element().getValue()) { 
       // products.add(productName); 
       productsString += productName + ", "; 
      } 
      productsString += "]"; 

      properties.put("prefix", makeValue(prefix).build());    
      properties.put("products", makeValue(productsString).build()); 
      entityBuilder.putAllProperties(properties); 
      c.output(entityBuilder.build()); 
     } 
    } 


    /** 
    * Options supported by this class. 
    * 
    * <p>Inherits standard Beam example configuration options. 
    */ 
    public interface Options 
    extends AutocompleteOptions { 
     @Description("Input text file") 
     @Validation.Required 
     String getInputFile(); 
     void setInputFile(String value); 

     @Description("Cloud Datastore entity kind") 
     @Default.String("prefix-product-map") 
     String getKind(); 
     void setKind(String value); 

     @Description("Whether output to Cloud Datastore") 
     @Default.Boolean(true) 
     Boolean getOutputToDatastore(); 
     void setOutputToDatastore(Boolean value); 

     @Description("Cloud Datastore ancestor key") 
     @Default.String("root") 
     String getDatastoreAncestorKey(); 
     void setDatastoreAncestorKey(String value); 

     @Description("Cloud Datastore output project ID, defaults to project ID") 
     String getOutputProject(); 
     void setOutputProject(String value); 
    } 


    public static void main(String[] args) throws IOException{ 

     Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); 

     // create the pipeline 
     Pipeline p = Pipeline.create(options); 

     PCollection<String> toWrite = p 

      // A step to read in the product names from a text file on GCS 
      .apply(TextIO.read().from("gs://sample-product-data/clean_product_names.txt")) 

      // Next expand the product names into KV pairs with prefix as key (<KV<String, String>>) 
      .apply("Explode Prefixes", ParDo.of(new AllPrefixes(2))) 

      // Apply a GroupByKey transform to the PCollection "flatCollection" to create "productsGroupedByPrefix". 
      .apply(GroupByKey.<String, String>create()) 

      // Now format the PCollection for writing into the Google Datastore 
      .apply("FormatForDatastore", ParDo.of(new FormatForDatastore(options.getKind(), 
        options.getDatastoreAncestorKey())) 

      // Write the processed data to the Google Cloud Datastore 
      // NOTE: This is the line that I'm getting the error on!! 
      .apply(DatastoreIO.v1().write().withProjectId(MoreObjects.firstNonNull(
        options.getOutputProject(), options.getOutputProject())))); 

     // Run the pipeline. 
     PipelineResult result = p.run(); 
    } 
} 

回答

2

我认为你需要另一个右括号。我已经删除了一些外来位和缩进根据括号:

PCollection<String> toWrite = p 
    .apply(TextIO.read().from("...")) 
    .apply("Explode Prefixes", ...) 
    .apply(GroupByKey.<String, String>create()) 
    .apply("FormatForDatastore", ParDo.of(new FormatForDatastore(
     options.getKind(), options.getDatastoreAncestorKey())) 
     .apply(...); 

具体来说,您需要另一个括号关闭apply("FormatForDatastore", ...)。现在,它正试图呼叫ParDo.of(...).apply(...)这是行不通的。

+0

非常感谢您的输入,您是否说我需要在'options.getDatastoreAncestorKey()))'''之后添加一个close paren。 如果是这样,当我这样做,我得到一个红色的语法错误,强调在整个申请我刚刚把它添加到,告诉我说: '类型不匹配:不能从ParDo.SingleOutput 转换>,Entity> to PTransform <? super PCollection >>,OutputT>' – MrSimmonsSr

+1

是的,您需要添加该paren。后面的错误表明你有一个转换,期望收集'KV >'作为输入,但给它一个'KV >'的集合。 FormatForDatastore DoFn的类型应该扩展DoFn >,Entity>'。 –