好的,大家。来自Dataflow新手的另一个数据流问题。 (刚刚开始玩这个星期..)Google云数据流写入数据问题(TextIO或DatastoreIO)
我创建一个数据管道,以获取产品名称列表并生成自动完成数据。数据处理部分看起来工作正常,但我错过了一些显而易见的事情,因为当我添加最后一个“.apply”来使用DatastoreIO或TextIO来写数据时,我在我的语法错误IDE,上面写着以下内容:
“的方法适用(DatastoreV1.Write)是未定义的类型ParDo.SingleOutput>,实体>”
如果给我一个选项添加铸造的方法接收器,但这显然不是答案。在我尝试写出数据之前,是否需要执行其他一些步骤?在尝试写入数据之前,我最后一步是调用Dataflow的实体助手来将我的管道结构从>更改为,这对我来说似乎需要写入Datastore。
过去几天我对这件事感到非常沮丧,我甚至决定将数据写入一些AVRO文件,所以我可以直接将它加载到数据存储区中。想象一下,当我完成所有这些工作时,我是如何勾选的,并在与TextIO呼叫完全相同的地方得到完全相同的错误。这就是为什么我认为我必须在这里失去一些非常明显的东西。
这是我的代码。我把它全部列入参考,但你可能只需要看看底部的main []。任何投入将不胜感激!谢谢!
MrSimmonsSr
package com.client.autocomplete;
import com.client.autocomplete.AutocompleteOptions;
import com.google.datastore.v1.Entity;
import com.google.datastore.v1.Key;
import com.google.datastore.v1.Value;
import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import com.google.api.services.bigquery.model.TableRow;
import com.google.common.base.MoreObjects;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.extensions.jackson.ParseJsons;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.List;
import java.util.ArrayList;
/*
* A simple Dataflow pipeline to create autocomplete data from a list of
* product names. It then loads that prefix data into Google Cloud Datastore for consumption by
* a Google Cloud Function. That function will take in a prefix and return a list of 10 product names
*
* Pseudo Code Steps
* 1. Load a list of product names from Cloud Storage
* 2. Generate prefixes for use with autocomplete, based on the product names
* 3. Merge the prefix data together with 10 products per prefix
* 4. Write that prefix data to the Cloud Datastore as a KV with a <String>, List<String> structure
*
*/
public class ClientAutocompletePipeline {
private static final Logger LOG = LoggerFactory.getLogger(ClientAutocompletePipeline.class);
/**
* A DoFn that keys each product name by all of its prefixes.
* This creates one row in the PCollection for each prefix<->product_name pair
*/
private static class AllPrefixes
extends DoFn<String, KV<String, String>> {
private final int minPrefix;
private final int maxPrefix;
public AllPrefixes(int minPrefix) {
this(minPrefix, 10);
}
public AllPrefixes(int minPrefix, int maxPrefix) {
this.minPrefix = minPrefix;
this.maxPrefix = maxPrefix;
}
@ProcessElement
public void processElement(ProcessContext c) {
String productName= c.element().toString();
for (int i = minPrefix; i <= Math.min(productName.length(), maxPrefix); i++) {
c.output(KV.of(productName.substring(0, i), c.element()));
}
}
}
/**
* Takes as input the top product names per prefix, and emits an entity
* suitable for writing to Cloud Datastore.
*
*/
static class FormatForDatastore extends DoFn<KV<String, List<String>>, Entity> {
private String kind;
private String ancestorKey;
public FormatForDatastore(String kind, String ancestorKey) {
this.kind = kind;
this.ancestorKey = ancestorKey;
}
@ProcessElement
public void processElement(ProcessContext c) {
// Initialize an EntityBuilder and get it a valid key
Entity.Builder entityBuilder = Entity.newBuilder();
Key key = makeKey(kind, ancestorKey).build();
entityBuilder.setKey(key);
// New HashMap to hold all the properties of the Entity
Map<String, Value> properties = new HashMap<>();
String prefix = c.element().getKey();
String productsString = "Products[";
// iterate through the product names and add each one to the productsString
for (String productName : c.element().getValue()) {
// products.add(productName);
productsString += productName + ", ";
}
productsString += "]";
properties.put("prefix", makeValue(prefix).build());
properties.put("products", makeValue(productsString).build());
entityBuilder.putAllProperties(properties);
c.output(entityBuilder.build());
}
}
/**
* Options supported by this class.
*
* <p>Inherits standard Beam example configuration options.
*/
public interface Options
extends AutocompleteOptions {
@Description("Input text file")
@Validation.Required
String getInputFile();
void setInputFile(String value);
@Description("Cloud Datastore entity kind")
@Default.String("prefix-product-map")
String getKind();
void setKind(String value);
@Description("Whether output to Cloud Datastore")
@Default.Boolean(true)
Boolean getOutputToDatastore();
void setOutputToDatastore(Boolean value);
@Description("Cloud Datastore ancestor key")
@Default.String("root")
String getDatastoreAncestorKey();
void setDatastoreAncestorKey(String value);
@Description("Cloud Datastore output project ID, defaults to project ID")
String getOutputProject();
void setOutputProject(String value);
}
public static void main(String[] args) throws IOException{
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
// create the pipeline
Pipeline p = Pipeline.create(options);
PCollection<String> toWrite = p
// A step to read in the product names from a text file on GCS
.apply(TextIO.read().from("gs://sample-product-data/clean_product_names.txt"))
// Next expand the product names into KV pairs with prefix as key (<KV<String, String>>)
.apply("Explode Prefixes", ParDo.of(new AllPrefixes(2)))
// Apply a GroupByKey transform to the PCollection "flatCollection" to create "productsGroupedByPrefix".
.apply(GroupByKey.<String, String>create())
// Now format the PCollection for writing into the Google Datastore
.apply("FormatForDatastore", ParDo.of(new FormatForDatastore(options.getKind(),
options.getDatastoreAncestorKey()))
// Write the processed data to the Google Cloud Datastore
// NOTE: This is the line that I'm getting the error on!!
.apply(DatastoreIO.v1().write().withProjectId(MoreObjects.firstNonNull(
options.getOutputProject(), options.getOutputProject()))));
// Run the pipeline.
PipelineResult result = p.run();
}
}
非常感谢您的输入,您是否说我需要在'options.getDatastoreAncestorKey()))'''之后添加一个close paren。 如果是这样,当我这样做,我得到一个红色的语法错误,强调在整个申请我刚刚把它添加到,告诉我说: '类型不匹配:不能从ParDo.SingleOutput转换>,Entity> to PTransform <? super PCollection >>,OutputT>' –
MrSimmonsSr
是的,您需要添加该paren。后面的错误表明你有一个转换,期望收集'KV>'作为输入,但给它一个'KV >'的集合。 FormatForDatastore DoFn的类型应该扩展DoFn >,Entity>'。 –