我正在尝试创建和部署Dataflow管道,以将从Cloud Pubsub主题收集的数据流式传输到Cloud Datastore。不过,我一直运行到错误如下: 找不到符号符号:变量DatastoreIO位置:类DataUpload从Cloud Pubsub到数据存储区的数据流管道
我不明白我在做什么错了,因为我也跟着下面的文件导入相关包 - DatastoreV1以及后面的示例代码的语法:https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/datastore/DatastoreV1
任何帮助将大大缓解我已经做了这些几天的头撞。在此先感谢!
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.PubsubIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.io.datastore.DatastoreV1;
public class DataUpload{
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
options.setRunner(DataflowPipelineRunner.class);
options.setProject(projectName);
//options.setStagingLocation("gs://my-staging-bucket/staging");
options.setStreaming(true);
Pipeline p = Pipeline.create(options);
p.apply(PubsubIO.Read.topic("projects/{project_name}/topics/data"))
.apply(DatastoreIO.v1().write().withProjectId(projectId));
p.run();
}
}
更新:我已经包括下面的线,得到了错误的一个新的一群。
import com.google.cloud.dataflow.sdk.io.datastore.DatastoreIO;
DataUpload.java:[30,14]找到的应用(com.google.cloud.dataflow.sdk.io.datastore.DatastoreV1.Write) 方法com.google.cloud.dataflow没有合适的方法.sdk.values.PCollection.apply(com.google.cloud.dataflow.sdk.transforms.PTransform,OutputT>)不适用 (无法推断类型变量OutputT (参数不匹配; com.google.cloud .dataflow.sdk.io.datastore.DatastoreV1.Write无法转换为com.google.cloud.dataflow.sdk.transforms.PTransform,OutputT>)) com.google.cloud.dataflow.sdk.values.PCollection方法。 apply(java.lang.String,com.google.cloud.dataflow.sdk.transforms.PTransform,OutputT>)不适用 (无法推断类型变量OutputT (实际和正式参数列表长度不同))
感谢您的建议。最初我使用了这条线,但还有一些其他错误提示DatastoreV1丢失。所以我同时添加了两个 'import com.google.cloud.dataflow.sdk.io.DatastoreIO' 'import com.google.cloud.dataflow.sdk.io.DatastoreV1' 现在我得到了另一堆引用该行的错误该方法被调用,“没有找到合适的方法(com.google.cloud.dataflow.sdk.io.datastore.DatastoreV1.Write)”等 – jlyh
我认为问题是您的PCollection数据类型不匹配。 PubSubIO返回PCollection https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/PubsubIO.Read DatstoreIO接受PCollection https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/DatastoreIO 您需要在中间引入一个ParDo,以便从PubSubIO的字符串值存储到数据存储实体对象类型。 https://cloud.google。com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/DatastoreIO –
此外,请看这里的示例,您还需要指定数据集以写入数据存储库 https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/DatastoreIO –